Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MessageConsumer XA Commit and Rollback tests for artemis-jms #320

Merged
merged 1 commit into from
Nov 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public String receive() {
message.acknowledge();
return message.getBodyBuffer().readString();
}
} catch (ActiveMQException e) {
} catch (ActiveMQException | NullPointerException e) {
throw new RuntimeException("Could not receive message", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void send(String body) {
try (ClientProducer producer = session.createProducer(queueName)) {
producer.send(message);
}
} catch (ActiveMQException e) {
} catch (ActiveMQException | NullPointerException e) {
throw new RuntimeException("Could not send message", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public String receive() {
try (JMSContext context = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
JMSConsumer consumer = context.createConsumer(context.createQueue(queueName))) {
return consumer.receive(1000L).getBody(String.class);
} catch (JMSException e) {
} catch (JMSException | NullPointerException e) {
throw new RuntimeException("Could not receive message", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.quarkus.it.artemis.jms.common;

import jakarta.jms.*;
import jakarta.transaction.RollbackException;
import jakarta.transaction.Synchronization;
import jakarta.transaction.SystemException;
import jakarta.transaction.TransactionManager;

public class ArtemisJmsXaConsumerManager extends ArtemisJmsConsumerManager {
private final XAConnectionFactory xaConnectionFactory;
private final TransactionManager tm;
private String queueName;

/**
* This constructor exists solely for CDI ("You need to manually add a non-private no-args constructor").
*/
@SuppressWarnings("unused")
ArtemisJmsXaConsumerManager() {
this(null, null, null, null);
}

public ArtemisJmsXaConsumerManager(ConnectionFactory connectionFactory,
XAConnectionFactory xaConnectionFactory,
TransactionManager tm,
String queueName) {
super(connectionFactory, queueName);
this.tm = tm;
this.queueName = queueName;
this.xaConnectionFactory = xaConnectionFactory;
}

public String receiveXA(boolean rollback) throws SystemException, RollbackException {
XAJMSContext context = xaConnectionFactory.createXAContext();
JMSConsumer consumer = context.createConsumer(context.createQueue(queueName));

tm.getTransaction().enlistResource(context.getXAResource());
tm.getTransaction().registerSynchronization(new Synchronization() {
@Override
public void beforeCompletion() {
}

@Override
public void afterCompletion(int i) {
consumer.close();
zhfeng marked this conversation as resolved.
Show resolved Hide resolved
context.close();
}
});
if (rollback) {
tm.setRollbackOnly();
}
try {
return consumer.receive(1000L).getBody(String.class);
turing85 marked this conversation as resolved.
Show resolved Hide resolved
} catch (JMSException | NullPointerException e) {
throw new RuntimeException("Could not receive message", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,4 +69,40 @@ public void testRollback(String endpoint, JMSContext context, String queueName)
Assertions.assertNull(message);
}
}

public void sendAndVerifyXACommit(JMSContext context, String queueName, String xaEndpoint, String endpoint) {
String body = createBody();
try (JMSContext autoClosedContext = context) {
context.createProducer().send(context.createQueue(queueName), body);
}

// Consume the message in xa transaction
Response response = RestAssured.with().get(xaEndpoint);
Assertions.assertEquals(jakarta.ws.rs.core.Response.Status.OK.getStatusCode(), response.statusCode());
Assertions.assertEquals(body, response.getBody().asString());

// Receive from queue again to confirm nothing is received i.e. message was consumed
// and now there is no message in the queue
response = RestAssured.with().get(endpoint);
Assertions.assertEquals(jakarta.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(),
middagj marked this conversation as resolved.
Show resolved Hide resolved
response.statusCode());
}

public void sendAndVerifyXARollback(JMSContext context, String queueName, String xaEndpoint, String endpoint) {
String body = createBody();
try (JMSContext autoClosedContext = context) {
context.createProducer().send(context.createQueue(queueName), body);
}

// Consume the message but in rollback transaction
Response response = RestAssured.with().get(xaEndpoint);
Assertions.assertEquals(jakarta.ws.rs.core.Response.Status.OK.getStatusCode(), response.statusCode());
Assertions.assertEquals(body, response.getBody().asString());

// Receive from queue again to confirm message is received i.e. message wasn't consumed
// and rollback occurred
response = RestAssured.with().get(endpoint);
Assertions.assertEquals(jakarta.ws.rs.core.Response.Status.OK.getStatusCode(),
response.statusCode());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import io.quarkus.it.artemis.jms.common.ArtemisJmsConsumerManager;
import io.quarkus.it.artemis.jms.common.ArtemisJmsXaConsumerManager;
import io.quarkus.it.artemis.jms.common.ArtemisJmsXaProducerManager;
import io.smallrye.common.annotation.Identifier;

Expand All @@ -17,19 +17,19 @@
@Produces(MediaType.TEXT_PLAIN)
public class ArtemisEndpoint {
private final ArtemisJmsXaProducerManager defaultProducer;
private final ArtemisJmsConsumerManager defaultConsumer;
private final ArtemisJmsXaConsumerManager defaultConsumer;
private final ArtemisJmsXaProducerManager namedOneProducer;
private final ArtemisJmsConsumerManager namedOneConsumer;
private final ArtemisJmsXaConsumerManager namedOneConsumer;
private final ArtemisJmsXaProducerManager externallyDefinedProducer;
private final ArtemisJmsConsumerManager externallyDefinedConsumer;
private final ArtemisJmsXaConsumerManager externallyDefinedConsumer;

public ArtemisEndpoint(
ArtemisJmsXaProducerManager defaultProducer,
ArtemisJmsConsumerManager defaultConsumer,
ArtemisJmsXaConsumerManager defaultConsumer,
@Identifier("named-1") ArtemisJmsXaProducerManager namedOneProducer,
@Identifier("named-1") ArtemisJmsConsumerManager namedOneConsumer,
@Identifier("named-1") ArtemisJmsXaConsumerManager namedOneConsumer,
@Identifier("externally-defined") ArtemisJmsXaProducerManager externallyDefinedProducer,
@Identifier("externally-defined") ArtemisJmsConsumerManager externallyDefinedConsumer) {
@Identifier("externally-defined") ArtemisJmsXaConsumerManager externallyDefinedConsumer) {
this.defaultProducer = defaultProducer;
this.defaultConsumer = defaultConsumer;
this.namedOneProducer = namedOneProducer;
Expand Down Expand Up @@ -92,4 +92,46 @@ public void externallyDefinedPostXA(String message) throws Exception {
public String externallyDefinedGet() {
return externallyDefinedConsumer.receive();
}

@GET
@Path("/xa")
@Transactional
public String defaultGetXACommit() throws Exception {
return defaultConsumer.receiveXA(false);
}

@GET
@Path("/xa-rollback")
@Transactional
public String defaultGetXARollback() throws Exception {
return defaultConsumer.receiveXA(true);
}

@GET
@Path("named-1/xa")
@Transactional
public String namedOneGetXACommit() throws Exception {
return namedOneConsumer.receiveXA(false);
}

@GET
@Path("named-1/xa-rollback")
@Transactional
public String namedOneGetXARollback() throws Exception {
return namedOneConsumer.receiveXA(true);
}

@GET
@Path("externally-defined/xa")
@Transactional
public String externallyDefinedXACommit() throws Exception {
return externallyDefinedConsumer.receiveXA(false);
}

@GET
@Path("externally-defined/xa-rollback")
@Transactional
public String externallyDefinedXARollback() throws Exception {
return externallyDefinedConsumer.receiveXA(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import io.quarkus.it.artemis.jms.common.ArtemisJmsConsumerManager;
import io.quarkus.it.artemis.jms.common.ArtemisJmsXaConsumerManager;
import io.quarkus.it.artemis.jms.common.ArtemisJmsXaProducerManager;
import io.smallrye.common.annotation.Identifier;

Expand All @@ -26,25 +26,35 @@ ActiveMQConnectionFactory externallyDefinedConnectionFactory(

@Produces
@ApplicationScoped
ArtemisJmsConsumerManager defaultConsumerManager(
@SuppressWarnings("CdiInjectionPointsInspection") ConnectionFactory connectionFactory) {
return new ArtemisJmsConsumerManager(connectionFactory, "test-jms-default");
ArtemisJmsXaConsumerManager defaultConsumerManager(
@SuppressWarnings("CdiInjectionPointsInspection") ConnectionFactory defaultConnectionFactory,
@SuppressWarnings("CdiInjectionPointsInspection") XAConnectionFactory defaultXaConnectionFactory,
@SuppressWarnings("CdiInjectionPointsInspection") TransactionManager tm) {
return new ArtemisJmsXaConsumerManager(defaultConnectionFactory, defaultXaConnectionFactory, tm, "test-jms-default");
}

@Produces
@ApplicationScoped
@Identifier("named-1")
ArtemisJmsConsumerManager namedOneConsumerManager(
@SuppressWarnings("CdiInjectionPointsInspection") @Identifier("named-1") ConnectionFactory namedOneConnectionFactory) {
return new ArtemisJmsConsumerManager(namedOneConnectionFactory, "test-jms-named-1");
ArtemisJmsXaConsumerManager namedOneConsumerManager(
@SuppressWarnings("CdiInjectionPointsInspection") @Identifier("named-1") ConnectionFactory namedOneConnectionFactory,
@SuppressWarnings("CdiInjectionPointsInspection") @Identifier("named-1") XAConnectionFactory namedOneXaConnectionFactory,
@SuppressWarnings("CdiInjectionPointsInspection") TransactionManager tm) {
return new ArtemisJmsXaConsumerManager(namedOneConnectionFactory, namedOneXaConnectionFactory, tm, "test-jms-named-1");
}

@Produces
@ApplicationScoped
@Identifier("externally-defined")
ArtemisJmsConsumerManager externallyDefinedConsumer(
@Identifier("externally-defined") ConnectionFactory namedOneConnectionFactory) {
return new ArtemisJmsConsumerManager(namedOneConnectionFactory, "test-jms-externally-defined");
ArtemisJmsXaConsumerManager externallyDefinedConsumer(
@Identifier("externally-defined") ConnectionFactory externallyDefinedConnectionFactory,
@Identifier("externally-defined") XAConnectionFactory externallyDefinedXaConnectionFactory,
@SuppressWarnings("CdiInjectionPointsInspection") TransactionManager tm) {
return new ArtemisJmsXaConsumerManager(
externallyDefinedConnectionFactory,
externallyDefinedXaConnectionFactory,
tm,
"test-jms-externally-defined");
}

@Produces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,25 @@ void testDefault() {
void testNamedOne() {
sendAndVerify(createNamedOneContext(), "test-jms-named-1", "/artemis/named-1");
}

@Test
void testDefaultXACommit() {
sendAndVerifyXACommit(createDefaultContext(), "test-jms-default", "/artemis/xa", "/artemis");
}

@Test
void testNamedOneXACommit() {
sendAndVerifyXACommit(createNamedOneContext(), "test-jms-named-1", "/artemis/named-1/xa", "/artemis/named-1/");
}

@Test
void testDefaultXARollback() {
sendAndVerifyXARollback(createDefaultContext(), "test-jms-default", "/artemis/xa-rollback", "/artemis");
}

@Test
void testNamedOneXARollback() {
sendAndVerifyXARollback(createNamedOneContext(), "test-jms-named-1", "/artemis/named-1/xa-rollback",
"/artemis/named-1/");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,19 @@ void testExternallyDefined() {
"test-jms-externally-defined",
"/artemis/externally-defined");
}

@Test
void testExternallyDefinedXACommit() {
sendAndVerifyXACommit(createExternallyDefinedContext("artemis.externally-defined.url"), "test-jms-externally-defined",
"/artemis/externally-defined/xa",
"/artemis/externally-defined/");
}

@Test
void testExternallyDefinedXARollback() {
sendAndVerifyXARollback(createExternallyDefinedContext("artemis.externally-defined.url"),
"test-jms-externally-defined",
"/artemis/externally-defined/xa-rollback",
"/artemis/externally-defined/");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

import io.quarkus.it.artemis.jms.common.ArtemisJmsConsumerManager;
import io.quarkus.it.artemis.jms.common.ArtemisJmsXaConsumerManager;
import io.quarkus.it.artemis.jms.common.ArtemisJmsXaProducerManager;
import io.smallrye.common.annotation.Identifier;

Expand All @@ -17,15 +17,15 @@
@Produces(MediaType.TEXT_PLAIN)
public class ArtemisEndpoint {
private final ArtemisJmsXaProducerManager defaultProducer;
private final ArtemisJmsConsumerManager defaultConsumer;
private final ArtemisJmsXaConsumerManager defaultConsumer;
private final ArtemisJmsXaProducerManager namedOneProducer;
private final ArtemisJmsConsumerManager namedOneConsumer;
private final ArtemisJmsXaConsumerManager namedOneConsumer;

public ArtemisEndpoint(
ArtemisJmsXaProducerManager defaultProducer,
ArtemisJmsConsumerManager defaultConsumer,
ArtemisJmsXaConsumerManager defaultConsumer,
@Identifier("named-1") ArtemisJmsXaProducerManager namedOneProducer,
@Identifier("named-1") ArtemisJmsConsumerManager namedOneConsumer) {
@Identifier("named-1") ArtemisJmsXaConsumerManager namedOneConsumer) {
this.defaultProducer = defaultProducer;
this.defaultConsumer = defaultConsumer;
this.namedOneProducer = namedOneProducer;
Expand Down Expand Up @@ -67,4 +67,32 @@ public String namedOneGet() {
public void namedOnePostXA(String message) throws Exception {
namedOneProducer.sendXA(message);
}

@GET
@Path("/xa")
@Transactional
public String defaultGetXACommit() throws Exception {
return defaultConsumer.receiveXA(false);
}

@GET
@Path("/xa-rollback")
@Transactional
public String defaultGetXARollback() throws Exception {
return defaultConsumer.receiveXA(true);
}

@GET
@Path("named-1/xa")
@Transactional
public String namedOneGetXACommit() throws Exception {
return namedOneConsumer.receiveXA(false);
}

@GET
@Path("named-1/xa-rollback")
@Transactional
public String namedOneGetXARollback() throws Exception {
return namedOneConsumer.receiveXA(true);
}
}
Loading