Skip to content

Commit

Permalink
Merge pull request #329 from IBMStreams/develop
Browse files Browse the repository at this point in the history
Merge latest 'develop' changes to master
  • Loading branch information
schulz2 authored Oct 27, 2017
2 parents c3d87ff + 5481792 commit 29653c7
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -547,35 +547,16 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce
// Send message without retry in case of failure
// i.e connection problems, this method raise the error back to caller.
// No connection or message retry will be attempted.
boolean sendMessageNoRetry(Message message) throws JMSException {

boolean res = false;

void sendMessageNoRetry(Message message) throws JMSException {
try {

// try to send the message
synchronized (getSession()) {
getProducer().send(message);
res = true;

}
}
catch (JMSException e) {
// error has occurred, log error and try sending message again
logger.log(LogLevel.WARN, "ERROR_DURING_SEND", new Object[] { e.toString() }); //$NON-NLS-1$

// If the exception is caused by message format, then we can return peacefully as connection is still good.
if(!(e instanceof MessageFormatException)) {
throw e;
}

throw e;
}

if(!res) {
nFailedInserts.incrementValue(1);
}

return res;
}

// send a consistent region message to the consistent region queue
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -720,8 +720,8 @@ private void registerForDataGovernance(String providerURL, String destination) {
@Override
public void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception {

boolean msgSent = false;
if(consistentRegionContext == null) {
boolean msgSent = false;
// Operator is not in a consistent region
if (isInitialConnection) {
jmsConnectionHelper.createInitialConnection();
Expand Down Expand Up @@ -752,6 +752,15 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception
tracer.log (LogLevel.ERROR, "Tuple dropped. Final failure re-sending tuple: " + finalExc.toString()); //$NON-NLS-1$
// no further action; tuple is dropped and sent to error port if present
}
}
if (!msgSent) {
if (tracer.isLoggable(LogLevel.FINE)) tracer.log(LogLevel.FINE, "tuple dropped"); //$NON-NLS-1$
logger.log(LogLevel.ERROR, "EXCEPTION_SINK"); //$NON-NLS-1$
if (hasErrorPort) {
// throws Exception
sendOutputErrorMsg(tuple,
Messages.getString("EXCEPTION_SINK")); //$NON-NLS-1$
}
}
}
else {
Expand All @@ -761,17 +770,7 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple) throws Exception
// propagate all exceptions to the runtime to restart the consistent region in case of failure
Message message = mhandler.convertTupleToMessage(tuple,
jmsConnectionHelper.getSession());
msgSent = jmsConnectionHelper.sendMessageNoRetry(message);
}

if (!msgSent) {
if (tracer.isLoggable(LogLevel.FINE)) tracer.log(LogLevel.FINE, "tuple dropped"); //$NON-NLS-1$
logger.log(LogLevel.ERROR, "EXCEPTION_SINK"); //$NON-NLS-1$
if (hasErrorPort) {
// throws Exception
sendOutputErrorMsg(tuple,
Messages.getString("EXCEPTION_SINK")); //$NON-NLS-1$
}
jmsConnectionHelper.sendMessageNoRetry(message);
}
}

Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.messaging/info.xml
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ The &lt;attribute> element has three possible attributes:
* composite types
* xml
</info:description>
<info:version>5.3.3</info:version>
<info:version>5.3.4</info:version>
<info:requiredProductVersion>4.2.0.0</info:requiredProductVersion>
</info:identity>
<info:dependencies/>
Expand Down

0 comments on commit 29653c7

Please sign in to comment.