From 12356b529646747ec58662b46caacb45b710513c Mon Sep 17 00:00:00 2001 From: conglisc Date: Fri, 31 Jul 2015 01:18:18 -0400 Subject: [PATCH 1/2] Update on JMSSource consistent region support --- .../JMSSource/JMSSource.xml | 25 +- .../messaging/jms/JMSConnectionHelper.java | 68 ++++-- .../ibm/streamsx/messaging/jms/JMSSource.java | 218 +++++++++++++++--- com.ibm.streamsx.messaging/info.xml | 4 +- 4 files changed, 267 insertions(+), 48 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml index 8f3fcea..7de491c 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml @@ -21,7 +21,30 @@ and sends it to the output stream. A single message is converted into a single t # Behavior in a consistent region -The `JMSSource` operator is not supported in a consistent region. +The `JMSSource` operator can participate in a consistent region. The operator must be at the start of a consistent region. + +The operator supports periodic and operator-driven consistent region policies. If the consistent region policy is set as operatorDriven, the triggerCount parameter must be specified. The operator initiates a checkpoint after number of tuples specified by the triggerCount parameter have been processed. +If the consistent region policy is set as periodic, the operator respects the period setting and establishes consistent states accordingly. + +When a message queue is consumed by multiple message consumers, i.e. multiple `JMSSource` instances are used to read messages from a same queue, then deterministic routing is required. This requirement can be achieved through the messageSelector parameter. For example, if an SPL application has two JMSSource operator instances and a JMS property named "group" is present on messages that can take value of either 'g1' or 'g2', then each JMSSource operator instance can be assigned in the following manner: + +MyPersonNamesStream1 = JMSSource() + { + param + connectionDocument :"/home/streamsuser/connections/JMSconnections.xml"; + connection : "amqConn"; + access : "amqAccess"; + messageSelector : "group = 'g1'"; + } + +MyPersonNamesStream2 = JMSSource() + { + param + connectionDocument :"/home/streamsuser/connections/JMSconnections.xml"; + connection : "amqConn"; + access : "amqAccess"; + messageSelector : "group = 'g2'"; + } # Exceptions diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java index 49387d0..e9642cb 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java @@ -74,7 +74,19 @@ class JMSConnectionHelper { private final boolean useClientAckMode; + // JMS message selector private String messageSelector; + + // Timestamp of session creation + private long sessionCreationTime; + + public long getSessionCreationTime() { + return sessionCreationTime; + } + + private void setSessionCreationTime(long sessionCreationTime) { + this.sessionCreationTime = sessionCreationTime; + } // procedure to detrmine if there exists a valid connection or not private boolean isConnectValid() { @@ -112,6 +124,7 @@ synchronized Session getSession() { // object private synchronized void setSession(Session session) { this.session = session; + this.setSessionCreationTime(System.currentTimeMillis()); } // setter for connect @@ -369,16 +382,26 @@ boolean sendMessage(Message message) throws ConnectionException, } // this subroutine receives messages from a message consumer - Message receiveMessage(long timeout) throws ConnectionException, InterruptedException, + // This method supports either blocking or non-blocking receive + // if wait is false, then timeout value is ignored + Message receiveMessage(boolean wait, long timeout) throws ConnectionException, InterruptedException, JMSException { try { - // try to receive a message - synchronized (getSession()) { - return (getConsumer().receive(timeout)); + + if(wait) { + // try to receive a message via blocking method + synchronized (getSession()) { + return (getConsumer().receive(timeout)); + } } - + else { + // try to receive a message with non blocking method + synchronized (getSession()) { + return (getConsumer().receiveNoWait()); + } + } + } - catch (JMSException e) { // If the JMSSource operator was interrupted in middle if (e.toString().contains("java.lang.InterruptedException")) { @@ -391,21 +414,40 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce new Object[] { e.toString() }); logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); createConnection(); - // retry to receive - synchronized (getSession()) { - return (getConsumer().receive()); + // retry to receive again + if(wait) { + // try to receive a message via blocking method + synchronized (getSession()) { + return (getConsumer().receive(timeout)); + } + } + else { + // try to receive a message with non blocking method + synchronized (getSession()) { + return (getConsumer().receiveNoWait()); + } } } } - public void recoverSession() throws JMSException { - - if(getSession() != null) { + // Recovers session causing unacknowledged message to be re-delivered + public void recoverSession() throws JMSException, ConnectionException, InterruptedException { + + try { synchronized (getSession()) { getSession().recover(); } + } catch (JMSException e) { + + logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); + setConnect(null); + createConnection(); + + synchronized (getSession()) { + getSession().recover(); + } + } - } // close the connection diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index 8fb35b3..3a7f90f 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -6,7 +6,11 @@ import java.io.File; import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.UnsupportedEncodingException; +import java.util.ArrayList; +import java.util.List; import java.util.logging.Logger; import javax.jms.JMSException; @@ -90,7 +94,7 @@ public class JMSSource extends ProcessTupleProducer implements StateHandler{ Metric nReconnectionAttempts; // when in consistent region, this parameter is used to indicate max time the receive method should block - public static final long receiveTimeout = 500l; + public static final long RECEIVE_TIMEOUT = 500l; // initialize the metrices. @CustomMetric(kind = Metric.Kind.COUNTER) @@ -163,8 +167,11 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) { private JMSSourceCRState crState = null; private String messageSelector = null; - + private boolean initalConnectionEstablished = false; + + private Object resetLock = new Object(); + public String getMessageSelector() { return messageSelector; } @@ -235,20 +242,28 @@ private class JMSSourceCRState { // Counters for counting number of received messages private int msgCounter; - private boolean doCheckpoint; + // Flag to indicate a checkpoint has been made. + private boolean isCheckpointPerformed; + + private List msgIDWIthSameTS; JMSSourceCRState() { lastMsgSent = null; msgCounter = 0; - doCheckpoint = false; + isCheckpointPerformed = false; + msgIDWIthSameTS = new ArrayList(); } - - private boolean isDoCheckpoint() { - return doCheckpoint; + + public List getMsgIDWIthSameTS() { + return msgIDWIthSameTS; } - private void setDoCheckpoint(boolean doCheckpoint) { - this.doCheckpoint = doCheckpoint; + public boolean isCheckpointPerformed() { + return isCheckpointPerformed; + } + + public void setCheckpointPerformed(boolean isCheckpointPerformed) { + this.isCheckpointPerformed = isCheckpointPerformed; } public void increaseMsgCounterByOne() { @@ -260,16 +275,23 @@ public Message getLastMsgSent() { } public void setLastMsgSent(Message lastMsgSent) { - this.lastMsgSent = lastMsgSent; + + try { + if(this.lastMsgSent != null && this.lastMsgSent.getJMSTimestamp() != lastMsgSent.getJMSTimestamp()) { + this.msgIDWIthSameTS.clear(); + } + + this.msgIDWIthSameTS.add(lastMsgSent.getJMSMessageID()); + } catch (JMSException e) { + + } + this.lastMsgSent = lastMsgSent; + } public int getMsgCounter() { return msgCounter; } - - public void setMsgCounter(int msgCounter) { - this.msgCounter = msgCounter; - } public void acknowledgeMsg() throws JMSException { if(lastMsgSent != null) { @@ -280,7 +302,8 @@ public void acknowledgeMsg() throws JMSException { public void reset() { lastMsgSent = null; msgCounter = 0; - doCheckpoint = false; + isCheckpointPerformed = false; + msgIDWIthSameTS = new ArrayList(); } } @@ -420,11 +443,11 @@ public synchronized void initialize(OperatorContext context) consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class); + JmsClasspathUtil.setupClassPaths(context); + if(consistentRegionContext != null) { crState = new JMSSourceCRState(); } - - JmsClasspathUtil.setupClassPaths(context); // create connection document parser object (which is responsible for // parsing the connection document) @@ -504,13 +527,28 @@ public synchronized void initialize(OperatorContext context) @Override protected void process() throws UnsupportedEncodingException, InterruptedException, ConnectionException, Exception { - // create the initial connection. - jmsConnectionHelper.createInitialConnection(); boolean isInConsistentRegion = consistentRegionContext != null; boolean isTriggerOperator = isInConsistentRegion && consistentRegionContext.isTriggerOperator(); - long timeout = isInConsistentRegion ? JMSSource.receiveTimeout : 0; + // create the initial connection. + try { + jmsConnectionHelper.createInitialConnection(); + if(isInConsistentRegion) { + notifyResetLock(true); + } + } catch (Exception e1) { + + if(isInConsistentRegion) { + notifyResetLock(false); + } + // Initial connection fails to be created. + // throw the exception. + throw e1; + } + + long timeout = isInConsistentRegion ? JMSSource.RECEIVE_TIMEOUT : 0; + long sessionCreationTime = 0; while (!Thread.interrupted()) { // read a message from the consumer @@ -521,19 +559,42 @@ protected void process() throws UnsupportedEncodingException, consistentRegionContext.acquirePermit(); // A checkpoint has been made, thus acknowledging the last sent message - if(crState.isDoCheckpoint()) { - - crState.acknowledgeMsg(); + if(crState.isCheckpointPerformed()) { - crState.reset(); + try { + crState.acknowledgeMsg(); + } catch (Exception e) { + consistentRegionContext.reset(); + } finally { + crState.reset(); + } + } } - Message m = jmsConnectionHelper.receiveMessage(timeout); + Message m = jmsConnectionHelper.receiveMessage(true, timeout); if(m == null) { continue; } + + if(isInConsistentRegion) { + // following section takes care of possible duplicate messages + // i.e connection re-created due to failure causing unacknowledged message to be delivered again + // we don't want to process duplicate messages again. + if(crState.getLastMsgSent() == null) { + sessionCreationTime = jmsConnectionHelper.getSessionCreationTime(); + } + else { + // if session has been re-created and message is duplicate,ignore + if(jmsConnectionHelper.getSessionCreationTime() > sessionCreationTime && + isDuplicateMsg(m, crState.getLastMsgSent().getJMSTimestamp(), crState.getMsgIDWIthSameTS())) { + logger.log(LogLevel.INFO, "Ignored duplicated message: " + m.getJMSMessageID()); + continue; + } + } + } + // nMessagesRead indicates the number of messages which we have // read from the JMS Provider successfully nMessagesRead.incrementValue(1); @@ -604,13 +665,11 @@ protected void process() throws UnsupportedEncodingException, // If the consistent region is driven by operator, then // 1. increase message counter // 2. Call make consistent region if message counter reached the triggerCounter specified by user - // 3. Reset counter to 0 for next round of CR if(isTriggerOperator) { crState.increaseMsgCounterByOne(); if(crState.getMsgCounter() == getTriggerCount()){ consistentRegionContext.makeConsistent(); - //crState.reset(); } } @@ -647,6 +706,27 @@ public void shutdown() throws Exception { } } + + private boolean isInitalConnectionEstablished() throws InterruptedException { + + synchronized(resetLock) { + if(initalConnectionEstablished) { + return true; + } + + resetLock.wait(); + return initalConnectionEstablished; + } + } + + private void notifyResetLock(boolean result) { + if(consistentRegionContext != null) { + synchronized(resetLock) { + initalConnectionEstablished = result; + resetLock.notifyAll(); + } + } + } @Override public void close() throws IOException { @@ -656,8 +736,17 @@ public void close() throws IOException { @Override public void checkpoint(Checkpoint checkpoint) throws Exception { logger.log(LogLevel.INFO, "Checkpoint... "); - - crState.setDoCheckpoint(true); + + crState.setCheckpointPerformed(true); + + ObjectOutputStream stream = checkpoint.getOutputStream(); + + stream.writeBoolean(crState.getLastMsgSent() != null); + + if(crState.getLastMsgSent() != null) { + stream.writeLong(crState.getLastMsgSent().getJMSTimestamp()); + stream.writeObject(crState.getMsgIDWIthSameTS()); + } } @@ -667,23 +756,88 @@ public void drain() throws Exception { } + @SuppressWarnings("unchecked") @Override public void reset(Checkpoint checkpoint) throws Exception { logger.log(LogLevel.INFO, "Reset to checkpoint " + checkpoint.getSequenceId()); + if(!isInitalConnectionEstablished()) { + throw new ConnectionException("Connection to JMS failed."); + } + // Reset consistent region variables and recover JMS session to make re-delivery of // unacknowledged message - crState.reset(); jmsConnectionHelper.recoverSession(); + + ObjectInputStream stream = checkpoint.getInputStream(); + boolean hasMsg = stream.readBoolean(); + + if(hasMsg) { + long lastSentMsgTS = stream.readLong(); + List lastSentMsgIDs = (List) stream.readObject(); + + deduplicateMsg(lastSentMsgTS, lastSentMsgIDs); + } + + crState.reset(); + + } + + private boolean isDuplicateMsg(Message msg, long lastSentMsgTs, List lastSentMsgIDs) throws JMSException { + boolean res = false; + + if(msg.getJMSTimestamp() < lastSentMsgTs) { + res = true; + } + else if(msg.getJMSTimestamp() == lastSentMsgTs) { + + if(lastSentMsgIDs.contains(msg.getJMSMessageID())) { + res = true; + } + + } + + return res; + + } + + private void deduplicateMsg(long lastSentMsgTs, List lastSentMsgIDs) throws JMSException, ConnectionException, InterruptedException { + logger.log(LogLevel.INFO, "Deduplicate messages..."); + + boolean stop = false; + + while(!stop) { + + Message msg = jmsConnectionHelper.receiveMessage(false, 0); + + if(msg == null) { + return; + } + + if(isDuplicateMsg(msg, lastSentMsgTs, lastSentMsgIDs)) { + msg.acknowledge(); + logger.log(LogLevel.INFO, "Ignored duplicated message: " + msg.getJMSMessageID()); + } + else { + jmsConnectionHelper.recoverSession(); + stop = true; + } + + } } @Override public void resetToInitialState() throws Exception { logger.log(LogLevel.INFO, "Resetting to Initial..."); - - crState.reset(); + + if(!isInitalConnectionEstablished()) { + throw new ConnectionException("Connection to JMS failed."); + } + jmsConnectionHelper.recoverSession(); + crState.reset(); + } @Override diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index 7e3e131..8ecb41c 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -680,8 +680,8 @@ The <attribute> element has three possible attributes: * composite types * xml - 2.0.0 - 4.0.0.0 + 2.1.1 + 4.0.1.0 From 82bcf752eeff977a3b1cb8acb6c5ff362bf9f57a Mon Sep 17 00:00:00 2001 From: conglisc Date: Fri, 31 Jul 2015 18:32:43 -0400 Subject: [PATCH 2/2] #99: add back consistent region support --- .../JMSSource/JMSSource.xml | 18 +++++++++++++ .../ibm/streamsx/messaging/jms/JMSSink.java | 25 +++++++++++++++++-- 2 files changed, 41 insertions(+), 2 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml index efd6954..7de491c 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.jms/JMSSource/JMSSource.xml @@ -288,6 +288,24 @@ If this parameter is not specified, the operator uses the file that is in the de rstring 1 + + triggerCount + +This optional parameter specifies how many messages are submitted before the JMSSource operator starts to drain the pipeline and establish a consistent state. +This parameter must be greater than zero and must be set if the JMSSource operator is the start operator of an operatorDriven consistent region. + true + int32 + 1 + + + messageSelector + + This optional parameter is used as JMS Message Selector. + + true + rstring + 1 + diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java index 7cf322e..bcb8a5f 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java @@ -36,6 +36,7 @@ import com.ibm.streams.operator.state.ConsistentRegionContext; import com.ibm.streams.operator.state.StateHandler; + //The JMSSink operator publishes data from Streams to a JMS Provider queue or a topic. public class JMSSink extends AbstractOperator implements StateHandler{ @@ -158,6 +159,9 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) { // Variable to define if the connection attempted to the JMSProvider is the // first one. private boolean isInitialConnection = true; + + // consistent region context + private ConsistentRegionContext consistentRegionContext; // Mandatory parameter access @Parameter(optional = false) @@ -373,7 +377,7 @@ public synchronized void initialize(OperatorContext context) JmsClasspathUtil.setupClassPaths(context); - context.registerStateHandler(this); + consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class); /* * Set appropriate variables if the optional error output port is @@ -414,7 +418,7 @@ public synchronized void initialize(OperatorContext context) jmsConnectionHelper = new JMSConnectionHelper(reconnectionPolicy, reconnectionBound, period, true, maxMessageSendRetries, messageSendRetryDelay, connectionDocumentParser.getDeliveryMode(), - nReconnectionAttempts, nFailedInserts, logger); + nReconnectionAttempts, nFailedInserts, logger, (consistentRegionContext != null)); jmsConnectionHelper.createAdministeredObjects( connectionDocumentParser.getInitialContextFactory(), connectionDocumentParser.getProviderURL(), @@ -486,6 +490,7 @@ public void process(StreamingInput stream, Tuple tuple) jmsConnectionHelper.createInitialConnection(); isInitialConnection = false; } + // Construct the JMS message based on the message type taking the // attributes from the tuple. Message message = mhandler.convertTupleToMessage(tuple, @@ -499,6 +504,8 @@ public void process(StreamingInput stream, Tuple tuple) "Dropping this tuple since an exception occurred while sending."); } } + + } @@ -539,6 +546,11 @@ public void close() throws IOException { @Override public void checkpoint(Checkpoint checkpoint) throws Exception { logger.log(LogLevel.INFO, "checkpoint " + checkpoint.getSequenceId()); + + if(jmsConnectionHelper.getSession() != null) { + jmsConnectionHelper.getSession().commit(); + } + } @Override @@ -549,11 +561,20 @@ public void drain() throws Exception { @Override public void reset(Checkpoint checkpoint) throws Exception { logger.log(LogLevel.INFO, "Reset to checkpoint " + checkpoint.getSequenceId()); + + if(jmsConnectionHelper.getSession() != null) { + jmsConnectionHelper.getSession().rollback(); + } + } @Override public void resetToInitialState() throws Exception { logger.log(LogLevel.INFO, "Reset to initial state"); + + if(jmsConnectionHelper.getSession() != null) { + jmsConnectionHelper.getSession().rollback(); + } } @Override