From c3a6565abd78fe71241622c6158dd5d1e738452a Mon Sep 17 00:00:00 2001 From: rolefhei Date: Fri, 13 Oct 2017 13:42:31 +0200 Subject: [PATCH 1/2] JMSSink: reset CR in case of msg not sent --- .../messaging/jms/JMSConnectionHelper.java | 23 ++----------------- .../ibm/streamsx/messaging/jms/JMSSink.java | 23 +++++++++---------- 2 files changed, 13 insertions(+), 33 deletions(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java index 31c1d1f..dcf9dbc 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java @@ -547,35 +547,16 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce // Send message without retry in case of failure // i.e connection problems, this method raise the error back to caller. // No connection or message retry will be attempted. - boolean sendMessageNoRetry(Message message) throws JMSException { - - boolean res = false; - + void sendMessageNoRetry(Message message) throws JMSException { try { - - // try to send the message synchronized (getSession()) { getProducer().send(message); - res = true; - } } catch (JMSException e) { - // error has occurred, log error and try sending message again logger.log(LogLevel.WARN, "ERROR_DURING_SEND", new Object[] { e.toString() }); //$NON-NLS-1$ - - // If the exception is caused by message format, then we can return peacefully as connection is still good. - if(!(e instanceof MessageFormatException)) { - throw e; - } - + throw e; } - - if(!res) { - nFailedInserts.incrementValue(1); - } - - return res; } // send a consistent region message to the consistent region queue diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java index 570a34b..929c4ed 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java @@ -720,8 +720,8 @@ private void registerForDataGovernance(String providerURL, String destination) { @Override public void process(StreamingInput stream, Tuple tuple) throws Exception { - boolean msgSent = false; if(consistentRegionContext == null) { + boolean msgSent = false; // Operator is not in a consistent region if (isInitialConnection) { jmsConnectionHelper.createInitialConnection(); @@ -752,6 +752,15 @@ public void process(StreamingInput stream, Tuple tuple) throws Exception tracer.log (LogLevel.ERROR, "Tuple dropped. Final failure re-sending tuple: " + finalExc.toString()); //$NON-NLS-1$ // no further action; tuple is dropped and sent to error port if present } + } + if (!msgSent) { + if (tracer.isLoggable(LogLevel.FINE)) tracer.log(LogLevel.FINE, "tuple dropped"); //$NON-NLS-1$ + logger.log(LogLevel.ERROR, "EXCEPTION_SINK"); //$NON-NLS-1$ + if (hasErrorPort) { + // throws Exception + sendOutputErrorMsg(tuple, + Messages.getString("EXCEPTION_SINK")); //$NON-NLS-1$ + } } } else { @@ -761,17 +770,7 @@ public void process(StreamingInput stream, Tuple tuple) throws Exception // propagate all exceptions to the runtime to restart the consistent region in case of failure Message message = mhandler.convertTupleToMessage(tuple, jmsConnectionHelper.getSession()); - msgSent = jmsConnectionHelper.sendMessageNoRetry(message); - } - - if (!msgSent) { - if (tracer.isLoggable(LogLevel.FINE)) tracer.log(LogLevel.FINE, "tuple dropped"); //$NON-NLS-1$ - logger.log(LogLevel.ERROR, "EXCEPTION_SINK"); //$NON-NLS-1$ - if (hasErrorPort) { - // throws Exception - sendOutputErrorMsg(tuple, - Messages.getString("EXCEPTION_SINK")); //$NON-NLS-1$ - } + jmsConnectionHelper.sendMessageNoRetry(message); } } From 54817928c0ced7f8976caadd1cec8838a4c39f6a Mon Sep 17 00:00:00 2001 From: Norbert Schulz Date: Fri, 27 Oct 2017 14:26:23 +0200 Subject: [PATCH 2/2] Bump version to v5.3.4 --- com.ibm.streamsx.messaging/info.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index 7c7557c..6e67481 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -680,7 +680,7 @@ The <attribute> element has three possible attributes: * composite types * xml - 5.3.3 + 5.3.4 4.2.0.0