diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index ea94225..c965395 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -15,8 +15,11 @@ import java.util.List; import java.util.logging.Logger; +import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.Queue; +import javax.jms.Topic; import javax.naming.NamingException; import javax.xml.parsers.ParserConfigurationException; @@ -1115,7 +1118,7 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS jmsDestinationAttrIdx = streamSchema.getAttributeIndex(this.getJmsDestinationOutAttrName()); } if(jmsDestinationAttrIdx != -1 && msg.getJMSDestination() != null) { - outTuple.setObject(jmsDestinationAttrIdx, new RString(msg.getJMSDestination().toString())); + outTuple.setObject(jmsDestinationAttrIdx, new RString(getDestinationName(msg.getJMSDestination()))); } if(this.getJmsDeliveryModeOutAttrName() != null) { @@ -1164,7 +1167,7 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS jmsReplyToAttrIdx = streamSchema.getAttributeIndex(this.getJmsReplyToOutAttrName()); } if(jmsReplyToAttrIdx != -1 && msg.getJMSReplyTo() != null) { - outTuple.setObject(jmsReplyToAttrIdx, new RString(msg.getJMSReplyTo().toString())); + outTuple.setObject(jmsReplyToAttrIdx, new RString(getDestinationName(msg.getJMSReplyTo()))); } if(this.getJmsTypeOutAttrName() != null) { @@ -1182,7 +1185,39 @@ private void handleJmsHeaderValues(Message msg, OutputTuple outTuple) throws JMS } } + + /** + * Determines and returns the destination name + * + * @param destination The destination to determine the name for. + * @return The name of a Queue or Topic, or a string representation of the destination object. + * @throws JMSException + */ + private String getDestinationName(Destination destination) throws JMSException { + if (destination instanceof Queue) return ((Queue)destination).getQueueName(); + if (destination instanceof Topic) return ((Topic)destination).getTopicName(); + return destination.toString(); + } + + + /** + * Handles the property values of the current message. + * + * @param msg The current JMS message. + * @param outTuple The output tuple. + * @throws JMSException + */ +// private void handleJmsMessagePropertyValues(Message msg, OutputTuple outTuple) throws JMSException { +// Enumeration propertyNames = msg.getPropertyNames(); +// +// while (propertyNames.hasMoreElements()) { +// String name = propertyNames.nextElement(); +// +// +// } +// } + // Send the error message on to the error output port if one is specified private void sendOutputErrorMsg(String errorMessage) { OutputTuple errorTuple = errorOutputPort.newTuple();