From 0759a021151ef43d08bae4e27fc20e45e173d74e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jiri=20Dan=C4=9Bk?= Date: Fri, 24 Mar 2023 21:51:31 +0100 Subject: [PATCH 1/3] Assorted jmslib and jakartalib fixups --- .../src/main/java/com/redhat/mqe/lib/ReceiverClient.java | 2 +- .../src/main/java/com/redhat/mqe/lib/SenderClient.java | 5 ++--- jmslib/src/main/java/com/redhat/mqe/lib/SenderClient.java | 5 ++--- 3 files changed, 5 insertions(+), 7 deletions(-) diff --git a/jakartalib/src/main/java/com/redhat/mqe/lib/ReceiverClient.java b/jakartalib/src/main/java/com/redhat/mqe/lib/ReceiverClient.java index 297ff98f..ce764994 100644 --- a/jakartalib/src/main/java/com/redhat/mqe/lib/ReceiverClient.java +++ b/jakartalib/src/main/java/com/redhat/mqe/lib/ReceiverClient.java @@ -155,7 +155,7 @@ private void unsubscribe() { try { session.unsubscribe(durableSubscriberName); } catch (JMSException e) { - LOG.error("Error while unsubscribing durable subscriptor " + durableSubscriberName); + LOG.error("Error while unsubscribing durable subscriber " + durableSubscriberName); e.printStackTrace(); } finally { close(session); diff --git a/jakartalib/src/main/java/com/redhat/mqe/lib/SenderClient.java b/jakartalib/src/main/java/com/redhat/mqe/lib/SenderClient.java index 9495b16b..2adab891 100644 --- a/jakartalib/src/main/java/com/redhat/mqe/lib/SenderClient.java +++ b/jakartalib/src/main/java/com/redhat/mqe/lib/SenderClient.java @@ -165,7 +165,7 @@ public void startClient() { doTransaction(session, senderOptions.getOption(ClientOptions.TX_ENDLOOP_ACTION).getValue()); } } catch (JMSException | IllegalArgumentException jmse) { - LOG.error("Error while sending a message!", jmse.getMessage()); + LOG.error("Error while sending a message! {}", jmse.getMessage()); jmse.printStackTrace(); System.exit(1); } finally { @@ -185,8 +185,7 @@ public void startClient() { protected static void setMessageProducer(ClientOptions senderOptions, MessageProducer producer) { try { // set delivery mode - durable/non-durable - String deliveryModeArg = senderOptions.getOption(ClientOptions.MSG_DURABLE).getValue().toLowerCase(); - int deliveryMode = (deliveryModeArg.equals("true") || deliveryModeArg.equals("yes")) + int deliveryMode = Utils.convertOptionToBoolean(senderOptions.getOption(ClientOptions.MSG_DURABLE).getValue()) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; producer.setDeliveryMode(deliveryMode); // set time to live of message if provided diff --git a/jmslib/src/main/java/com/redhat/mqe/lib/SenderClient.java b/jmslib/src/main/java/com/redhat/mqe/lib/SenderClient.java index 14b62658..bc733ea4 100644 --- a/jmslib/src/main/java/com/redhat/mqe/lib/SenderClient.java +++ b/jmslib/src/main/java/com/redhat/mqe/lib/SenderClient.java @@ -165,7 +165,7 @@ public void startClient() { doTransaction(session, senderOptions.getOption(ClientOptions.TX_ENDLOOP_ACTION).getValue()); } } catch (JMSException | IllegalArgumentException jmse) { - LOG.error("Error while sending a message!", jmse.getMessage()); + LOG.error("Error while sending a message! {}", jmse.getMessage()); jmse.printStackTrace(); System.exit(1); } finally { @@ -185,8 +185,7 @@ public void startClient() { protected static void setMessageProducer(ClientOptions senderOptions, MessageProducer producer) { try { // set delivery mode - durable/non-durable - String deliveryModeArg = senderOptions.getOption(ClientOptions.MSG_DURABLE).getValue().toLowerCase(); - int deliveryMode = (deliveryModeArg.equals("true") || deliveryModeArg.equals("yes")) + int deliveryMode = Utils.convertOptionToBoolean(senderOptions.getOption(ClientOptions.MSG_DURABLE).getValue()) ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; producer.setDeliveryMode(deliveryMode); // set time to live of message if provided From eb52263136888d2a1db5f898546b5690ac3b18d7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jiri=20Dan=C4=9Bk?= Date: Fri, 24 Mar 2023 21:51:04 +0100 Subject: [PATCH 2/3] Implement --conn-anonymous-producer option --- .../main/java/com/redhat/mqe/lib/ClientOptions.java | 1 + .../src/main/java/com/redhat/mqe/lib/SenderClient.java | 10 ++++++++-- .../main/java/com/redhat/mqe/lib/SenderOptions.java | 1 + .../main/java/com/redhat/mqe/lib/ClientOptions.java | 1 + .../src/main/java/com/redhat/mqe/lib/SenderClient.java | 10 ++++++++-- .../main/java/com/redhat/mqe/lib/SenderOptions.java | 1 + lib/src/main/java/com/redhat/mqe/lib/Utils.java | 5 +++++ 7 files changed, 25 insertions(+), 4 deletions(-) diff --git a/jakartalib/src/main/java/com/redhat/mqe/lib/ClientOptions.java b/jakartalib/src/main/java/com/redhat/mqe/lib/ClientOptions.java index c5732b79..2d246e84 100644 --- a/jakartalib/src/main/java/com/redhat/mqe/lib/ClientOptions.java +++ b/jakartalib/src/main/java/com/redhat/mqe/lib/ClientOptions.java @@ -145,6 +145,7 @@ void setOptions(ClientOptionManager clientOptionManager, @Args String[] args) { public static final String CONN_TCP_NO_DELAY = "conn-tcp-no-delay"; // wireFormat.tcpNoDelayEnabled public static final String CONN_TIGHT_ENCODING_ENA = "conn-tight-encoding-ena"; // wireFormat.tightEncodingEnabled public static final String CONN_WATCH_TOPIC_ADVISORIES = "conn-watch-topic-advisories"; + public static final String CONN_ANONYMOUS_PRODUCER = "conn-anonymous-producer"; // TODO Not implemented by client libraries // static final String CON_SSL_PROTOCOL = "conn-ssl-protocol"; diff --git a/jakartalib/src/main/java/com/redhat/mqe/lib/SenderClient.java b/jakartalib/src/main/java/com/redhat/mqe/lib/SenderClient.java index 2adab891..14848ccf 100644 --- a/jakartalib/src/main/java/com/redhat/mqe/lib/SenderClient.java +++ b/jakartalib/src/main/java/com/redhat/mqe/lib/SenderClient.java @@ -79,7 +79,9 @@ public void startClient() { Session session = (transaction == null || transaction.equals("none")) ? this.createSession(senderOptions, connection, false) : this.createSession(senderOptions, connection, true); connection.start(); - MessageProducer msgProducer = session.createProducer(this.getDestination()); + + boolean anonymousProducer = Utils.convertOptionToBoolean(senderOptions.getOption(ClientOptions.CONN_ANONYMOUS_PRODUCER).getValue()); + MessageProducer msgProducer = anonymousProducer ? session.createProducer(null) : session.createProducer(this.getDestination()); setMessageProducer(senderOptions, msgProducer); // Calculate msg-rate from COUNT & DURATION @@ -104,7 +106,11 @@ public void startClient() { // Send messages try { - msgProducer.send(message); + if (anonymousProducer) { + msgProducer.send(getDestination(), message); + } else { + msgProducer.send(message); + } } catch (Exception e) { switch (e.getCause().getClass().getName()) { case "org.apache.qpid.jms.provider.exceptions.ProviderDeliveryReleasedException": diff --git a/jakartalib/src/main/java/com/redhat/mqe/lib/SenderOptions.java b/jakartalib/src/main/java/com/redhat/mqe/lib/SenderOptions.java index dbdd47eb..613ecba0 100644 --- a/jakartalib/src/main/java/com/redhat/mqe/lib/SenderOptions.java +++ b/jakartalib/src/main/java/com/redhat/mqe/lib/SenderOptions.java @@ -47,6 +47,7 @@ public class SenderOptions extends ClientOptions { new Option(PROPERTY_TYPE, "", "PTYPE", "String", "specify the type of message property"), new Option(MSG_PROPERTY, "", "KEY=PVALUE", "", "specify message property as KEY=VALUE (use '~' instead of '=' for auto-casting)"), new Option(CONTENT_TYPE, "", "CTYPE", "String", "specify type of the actual content type"), + new Option(CONN_ANONYMOUS_PRODUCER, "", "ANONYMOUS", "no", "create anonymous (no queue specified) producer"), new Option(MSG_CONTENT_TYPE, "", "MSGTYPE", "", "type of JMSMessageBody to use in header"), new Option(MSG_CONTENT_FROM_FILE, "", "PATH", "", "specify filename to load content from"), new Option(MSG_CONTENT, "", "CONTENT", "", "actual content fed to message body"), diff --git a/jmslib/src/main/java/com/redhat/mqe/lib/ClientOptions.java b/jmslib/src/main/java/com/redhat/mqe/lib/ClientOptions.java index c5732b79..2d246e84 100644 --- a/jmslib/src/main/java/com/redhat/mqe/lib/ClientOptions.java +++ b/jmslib/src/main/java/com/redhat/mqe/lib/ClientOptions.java @@ -145,6 +145,7 @@ void setOptions(ClientOptionManager clientOptionManager, @Args String[] args) { public static final String CONN_TCP_NO_DELAY = "conn-tcp-no-delay"; // wireFormat.tcpNoDelayEnabled public static final String CONN_TIGHT_ENCODING_ENA = "conn-tight-encoding-ena"; // wireFormat.tightEncodingEnabled public static final String CONN_WATCH_TOPIC_ADVISORIES = "conn-watch-topic-advisories"; + public static final String CONN_ANONYMOUS_PRODUCER = "conn-anonymous-producer"; // TODO Not implemented by client libraries // static final String CON_SSL_PROTOCOL = "conn-ssl-protocol"; diff --git a/jmslib/src/main/java/com/redhat/mqe/lib/SenderClient.java b/jmslib/src/main/java/com/redhat/mqe/lib/SenderClient.java index bc733ea4..0b9cd656 100644 --- a/jmslib/src/main/java/com/redhat/mqe/lib/SenderClient.java +++ b/jmslib/src/main/java/com/redhat/mqe/lib/SenderClient.java @@ -79,7 +79,9 @@ public void startClient() { Session session = (transaction == null || transaction.equals("none")) ? this.createSession(senderOptions, connection, false) : this.createSession(senderOptions, connection, true); connection.start(); - MessageProducer msgProducer = session.createProducer(this.getDestination()); + + boolean anonymousProducer = Utils.convertOptionToBoolean(senderOptions.getOption(ClientOptions.CONN_ANONYMOUS_PRODUCER).getValue()); + MessageProducer msgProducer = anonymousProducer ? session.createProducer(null) : session.createProducer(this.getDestination()); setMessageProducer(senderOptions, msgProducer); // Calculate msg-rate from COUNT & DURATION @@ -104,7 +106,11 @@ public void startClient() { // Send messages try { - msgProducer.send(message); + if (anonymousProducer) { + msgProducer.send(getDestination(), message); + } else { + msgProducer.send(message); + } } catch (Exception e) { switch (e.getCause().getClass().getName()) { case "org.apache.qpid.jms.provider.exceptions.ProviderDeliveryReleasedException": diff --git a/jmslib/src/main/java/com/redhat/mqe/lib/SenderOptions.java b/jmslib/src/main/java/com/redhat/mqe/lib/SenderOptions.java index dbdd47eb..613ecba0 100644 --- a/jmslib/src/main/java/com/redhat/mqe/lib/SenderOptions.java +++ b/jmslib/src/main/java/com/redhat/mqe/lib/SenderOptions.java @@ -47,6 +47,7 @@ public class SenderOptions extends ClientOptions { new Option(PROPERTY_TYPE, "", "PTYPE", "String", "specify the type of message property"), new Option(MSG_PROPERTY, "", "KEY=PVALUE", "", "specify message property as KEY=VALUE (use '~' instead of '=' for auto-casting)"), new Option(CONTENT_TYPE, "", "CTYPE", "String", "specify type of the actual content type"), + new Option(CONN_ANONYMOUS_PRODUCER, "", "ANONYMOUS", "no", "create anonymous (no queue specified) producer"), new Option(MSG_CONTENT_TYPE, "", "MSGTYPE", "", "type of JMSMessageBody to use in header"), new Option(MSG_CONTENT_FROM_FILE, "", "PATH", "", "specify filename to load content from"), new Option(MSG_CONTENT, "", "CONTENT", "", "actual content fed to message body"), diff --git a/lib/src/main/java/com/redhat/mqe/lib/Utils.java b/lib/src/main/java/com/redhat/mqe/lib/Utils.java index ddd669f0..12271f9d 100644 --- a/lib/src/main/java/com/redhat/mqe/lib/Utils.java +++ b/lib/src/main/java/com/redhat/mqe/lib/Utils.java @@ -294,4 +294,9 @@ public static Object getObjectValue(Class clazz, String object, boolean allow return myObj; } + + public static boolean convertOptionToBoolean(String optionStringValue) { + String optionValue = optionStringValue.toLowerCase(); + return (optionValue.equals("true") || optionValue.equals("yes")); + } } From 3995aa8c5df1c46d770ef3ecc82042c9ef77ae41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jiri=20Dan=C4=9Bk?= Date: Sat, 25 Mar 2023 08:48:53 +0100 Subject: [PATCH 3/3] fixup --- .../src/main/java/com/redhat/mqe/jms/AacSenderOptions.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cli-qpid-jms/src/main/java/com/redhat/mqe/jms/AacSenderOptions.java b/cli-qpid-jms/src/main/java/com/redhat/mqe/jms/AacSenderOptions.java index 5b77b9fc..efec7441 100644 --- a/cli-qpid-jms/src/main/java/com/redhat/mqe/jms/AacSenderOptions.java +++ b/cli-qpid-jms/src/main/java/com/redhat/mqe/jms/AacSenderOptions.java @@ -75,7 +75,8 @@ public class AacSenderOptions extends AacClientOptions { // TODO new Option(SYNC_MODE, "", "SYNCMODE", "action", "synchronization mode: none/session/action/persistent/transient"), new Option(CAPACITY, "", "CAPACITY", "-1", "sender|receiver capacity (no effect in jms atm)"), - new Option(ON_RELEASE, "", "ACTION", "fail", "fail|ignore|retry action to perform if message is released by receiver") + new Option(ON_RELEASE, "", "ACTION", "fail", "fail|ignore|retry action to perform if message is released by receiver"), + new Option(CONN_ANONYMOUS_PRODUCER, "", "ANONYMOUS", "no", "create anonymous (no queue specified) producer") )); }