diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/EmptyMessageHandler.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/EmptyMessageHandler.java index 8f41bc8..b61dca1 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/EmptyMessageHandler.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/EmptyMessageHandler.java @@ -6,12 +6,20 @@ import java.util.List; +import javax.jms.BytesMessage; +import javax.jms.DeliveryMode; import javax.jms.JMSException; +import javax.jms.MapMessage; import javax.jms.Message; +import javax.jms.ObjectMessage; import javax.jms.Session; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; +import com.ibm.rmi.corba.ObjectManager; import com.ibm.streams.operator.Tuple; import com.ibm.streams.operator.OutputTuple; +import com.ibm.streams.operator.Type.MetaType; //Empty message class is used in JMSSink/JMSSource to send/receive // control message to the JMSPorvider to test initial connectivity. @@ -24,21 +32,75 @@ class EmptyMessageHandler extends JMSMessageHandlerImpl { } // Used by JMSSink to convert an incoming tuple to JMS MEssage - public Message convertTupleToMessage(Tuple tuple, Session session) - throws JMSException { + public Message convertTupleToMessage(Tuple tuple, Session session) throws JMSException { synchronized (session) { // simply create a new JMSMEssage and return return session.createMessage(); } - } // Used by JMSSource to convert an incoming JMS Message to a tuple - public MessageAction convertMessageToTuple(Message message, - OutputTuple tuple) { - // No validations are performed regarding the type of message - // No values are assigned to tuple elements. + public MessageAction convertMessageToTuple(Message message, OutputTuple tuple) { + + MetaType attrType = tuple.getStreamSchema().getAttribute(0).getType().getMetaType(); + + // if the first attribute is a RString, encode message information into + // the rstring + if (attrType == MetaType.RSTRING) { + try { + String msgId = message.getJMSMessageID(); + long expiration = message.getJMSExpiration(); + String type = getMessageType(message); + String deliveryModeStr = getDeliveryMode(message); + + // format is "messagetype, msgid, deliveryMode, expiration" + StringBuilder builder = new StringBuilder(); + builder.append(type); + builder.append(","); + builder.append(msgId); + builder.append(","); + builder.append(deliveryModeStr); + builder.append(","); + builder.append(expiration); + + tuple.setString(0, builder.toString()); + } catch (JMSException e) { + } + } + return MessageAction.SUCCESSFUL_MESSAGE; } + + private String getMessageType(Message message) { + if (message instanceof BytesMessage) + return "bytes"; + else if (message instanceof StreamMessage) + return "streams"; + else if (message instanceof MapMessage) + return "map"; + else if (message instanceof TextMessage) + return "text"; + else if (message instanceof ObjectMessage) + return "object"; + else if (message instanceof Message) + return "empty"; + return "unknown"; + } + + private String getDeliveryMode(Message message) { + String deliveryModeStr = "unknown"; + int deliveryMode = -1; + try { + deliveryMode = message.getJMSDeliveryMode(); + } catch (JMSException e) { + } + + if (deliveryMode == DeliveryMode.NON_PERSISTENT) { + deliveryModeStr = "non_persistent"; + } else if (deliveryMode == DeliveryMode.PERSISTENT) { + deliveryModeStr = "persistent"; + } + return deliveryModeStr; + } }