Skip to content

Commit

Permalink
Merge pull request #327 from IBMStreams/develop
Browse files Browse the repository at this point in the history
Merge latest changes from 'develop'
  • Loading branch information
schulz2 authored Oct 11, 2017
2 parents fed3794 + e744859 commit c3d87ff
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ DISCARD_MSG_TOO_SHORT=CDIST1470W The message is too short and was discarded.
DISCARD_WRONG_MESSAGE_TYPE=CDIST1471W The message is being discarded because the message type is incorrect. The expected type is: {0} message.
DISCARD_MSG_UNREADABLE=CDIST1474W An attempt was made to read a write-only message, and the message was discarded.
DISCARD_MESSAGE_MESSAGE_FORMAT_ERROR=CDIST1475W The type conversion was invalid. Hence the message was discarded.
DISCARD_MESSAGE_MESSAGE_VARIABLE=CDIST1379W The message was discarded for the following reason: {0}
DISCARD_TRANSACTION_AS_IT_HAS_BEEN_PROCESSED_LAST_TIME=CDIST1493I Discard this transaction as it has been processed last time.
DRAIN=Drain ...
ERROR_DURING_RECEIVE=CDIST1472W An error has happened during receive message call by the Consumer {0}
Expand Down Expand Up @@ -81,4 +82,4 @@ USER_CREDENTIALS_UPDATED=CDIST1546I User credentials has been updated
USERPRINCIPAL_AND_USERCREDENTIAL_MUST_BE_SET=CDIST1547E Only one of userPrinicpal, userCredential is set.
VALUE_OF_ACCESS_PARAM_NOT_FOUND_IN_CONN_DOC=CDIST1548E The value of the access parameter {0} is not found in the connections document
VALUE_OF_CONNECTION_PARAM_NOT_FOUND_IN_CONN_DOC=CDIST1549E The value of the connection parameter {0} is not found in the connections document.
VALUE_OF_CONNECTION_PARAM_NOT_THE_SAME_AS_CONN_USED_BY_ACCESS_ELEMENT=CDIST1428E The value of the connection parameter {0} is not the same as the connection used by access element {1} as mentioned in the uses_connection element in connections document.
VALUE_OF_CONNECTION_PARAM_NOT_THE_SAME_AS_CONN_USED_BY_ACCESS_ELEMENT=CDIST1428E The value of the connection parameter {0} is not the same as the connection used by access element {1} as mentioned in the uses_connection element in connections document.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.UnsupportedEncodingException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
Expand Down Expand Up @@ -55,9 +54,10 @@ public class JMSSource extends ProcessTupleProducer implements StateHandler{
* log facility as a child of the {@link LoggerNames#LOG_FACILITY}
* {@code Logger}. The {@code Logger} uses a
*/
private static Logger logger = Logger.getLogger(LoggerNames.LOG_FACILITY
private static final Logger logger = Logger.getLogger(LoggerNames.LOG_FACILITY
+ "." + CLASS_NAME, "com.ibm.streamsx.messaging.jms.JMSMessages"); //$NON-NLS-1$ //$NON-NLS-2$

private static final Logger tracer = Logger.getLogger(CLASS_NAME);

// variable to hold the output port
private StreamingOutput<OutputTuple> dataOutputPort;

Expand Down Expand Up @@ -713,8 +713,7 @@ private void registerForDataGovernance(String providerURL, String destination) {
}

@Override
protected void process() throws UnsupportedEncodingException,
InterruptedException, ConnectionException, Exception {
protected void process() throws IOException, ConnectionException{

boolean isInConsistentRegion = consistentRegionContext != null;
boolean isTriggerOperator = isInConsistentRegion && consistentRegionContext.isTriggerOperator();
Expand All @@ -733,14 +732,16 @@ protected void process() throws UnsupportedEncodingException,
if(isInConsistentRegion) {
notifyResetLock(true);
}
} catch (Exception e1) {
} catch (InterruptedException e) {
return;
} catch (ConnectionException e) {

if(isInConsistentRegion) {
notifyResetLock(false);
}
// Initial connection fails to be created.
// throw the exception.
throw e1;
throw e;
}

long timeout = JMSSource.RECEIVE_TIMEOUT;
Expand Down Expand Up @@ -768,112 +769,150 @@ protected void process() throws UnsupportedEncodingException,
}
}

Message m = jmsConnectionHelper.receiveMessage(timeout);

if(m == null) {
Message m = null;
try {
m = jmsConnectionHelper.receiveMessage(timeout);
} catch (ConnectionException | InterruptedException e) {
throw e;
} catch (/*JMS*/Exception e) {
// receive retry after reconnection attempt failed
continue;
}
if (m == null) {
continue;
}

if(isInConsistentRegion) {
// following section takes care of possible duplicate messages
// i.e connection re-created due to failure causing unacknowledged message to be delivered again
// we don't want to process duplicate messages again.
if(crState.getLastMsgSent() == null) {
sessionCreationTime = jmsConnectionHelper.getSessionCreationTime();
}
else {
// if session has been re-created and message is duplicate,ignore
if(jmsConnectionHelper.getSessionCreationTime() > sessionCreationTime &&
isDuplicateMsg(m, crState.getLastMsgSent().getJMSTimestamp(), crState.getMsgIDWIthSameTS())) {
logger.log(LogLevel.INFO, "IGNORED_DUPLICATED_MSG", m.getJMSMessageID()); //$NON-NLS-1$
continue;
}
}
}

// here we definitely have a JMS message. If exceptions happen here, we lose a message.
// nMessagesRead indicates the number of messages which we have
// read from the JMS Provider successfully
nMessagesRead.incrementValue(1);
OutputTuple dataTuple = dataOutputPort.newTuple();
try {

// convert the message to the output Tuple using the appropriate
// message handler
MessageAction returnVal = messageHandlerImpl
.convertMessageToTuple(m, dataTuple);
if(isInConsistentRegion) {
// following section takes care of possible duplicate messages
// i.e connection re-created due to failure causing unacknowledged message to be delivered again
// we don't want to process duplicate messages again.
if(crState.getLastMsgSent() == null) {
sessionCreationTime = jmsConnectionHelper.getSessionCreationTime();
}
else {
// if session has been re-created and message is duplicate,ignore
if(jmsConnectionHelper.getSessionCreationTime() > sessionCreationTime &&
isDuplicateMsg(m, crState.getLastMsgSent().getJMSTimestamp(), crState.getMsgIDWIthSameTS())) {
logger.log(LogLevel.INFO, "IGNORED_DUPLICATED_MSG", m.getJMSMessageID()); //$NON-NLS-1$
continue;
}
}
}

OutputTuple dataTuple = dataOutputPort.newTuple();

// take an action based on the return type
switch (returnVal) {
// the message type is incorrect
case DISCARD_MESSAGE_WRONG_TYPE:
nMessagesDropped.incrementValue(1);

logger.log(LogLevel.WARN, "DISCARD_WRONG_MESSAGE_TYPE", //$NON-NLS-1$
new Object[] { messageType });

// If the error output port is defined, redirect the error
// to error output port
if (hasErrorPort) {
sendOutputErrorMsg(Messages.getString("DISCARD_WRONG_MESSAGE_TYPE", new Object[] { messageType })); //$NON-NLS-1$
}
break;
// if unexpected end of message has been reached
case DISCARD_MESSAGE_EOF_REACHED:
nMessagesDropped.incrementValue(1);
logger.log(LogLevel.WARN, "DISCARD_MSG_TOO_SHORT"); //$NON-NLS-1$
// If the error output port is defined, redirect the error
// to error output port
if (hasErrorPort) {
sendOutputErrorMsg(Messages.getString("DISCARD_MSG_TOO_SHORT")); //$NON-NLS-1$
}
break;
// Mesage is read-only
case DISCARD_MESSAGE_UNREADABLE:
nMessagesDropped.incrementValue(1);
logger.log(LogLevel.WARN, "DISCARD_MSG_UNREADABLE"); //$NON-NLS-1$
// If the error output port is defined, redirect the error
// to error output port
if (hasErrorPort) {
sendOutputErrorMsg(Messages.getString("DISCARD_MSG_UNREADABLE")); //$NON-NLS-1$
}
break;
case DISCARD_MESSAGE_MESSAGE_FORMAT_ERROR:
nMessagesDropped.incrementValue(1);
logger.log(LogLevel.WARN,
"DISCARD_MESSAGE_MESSAGE_FORMAT_ERROR"); //$NON-NLS-1$
// If the error output port is defined, redirect the error
// to error output port
if (hasErrorPort) {
sendOutputErrorMsg(Messages.getString("DISCARD_MESSAGE_MESSAGE_FORMAT_ERROR")); //$NON-NLS-1$
}
break;
// the message was read successfully
case SUCCESSFUL_MESSAGE:
if(msgIDAttrIndex != -1 && m.getJMSMessageID() != null) {
dataTuple.setObject(msgIDAttrIndex, new RString(m.getJMSMessageID()));
}
dataOutputPort.submit(dataTuple);
break;
// convert the message to the output Tuple using the appropriate
// message handler
MessageAction returnVal = messageHandlerImpl
.convertMessageToTuple(m, dataTuple);

// take an action based on the return type
switch (returnVal) {
// the message type is incorrect
case DISCARD_MESSAGE_WRONG_TYPE:
nMessagesDropped.incrementValue(1);

logger.log(LogLevel.WARN, "DISCARD_WRONG_MESSAGE_TYPE", //$NON-NLS-1$
new Object[] { messageType });

// If the error output port is defined, redirect the error
// to error output port
if (hasErrorPort) {
sendOutputErrorMsg(Messages.getString("DISCARD_WRONG_MESSAGE_TYPE", new Object[] { messageType })); //$NON-NLS-1$
}
break;
// if unexpected end of message has been reached
case DISCARD_MESSAGE_EOF_REACHED:
nMessagesDropped.incrementValue(1);
logger.log(LogLevel.WARN, "DISCARD_MSG_TOO_SHORT"); //$NON-NLS-1$
// If the error output port is defined, redirect the error
// to error output port
if (hasErrorPort) {
sendOutputErrorMsg(Messages.getString("DISCARD_MSG_TOO_SHORT")); //$NON-NLS-1$
}
break;
// Mesage is read-only
case DISCARD_MESSAGE_UNREADABLE:
nMessagesDropped.incrementValue(1);
logger.log(LogLevel.WARN, "DISCARD_MSG_UNREADABLE"); //$NON-NLS-1$
// If the error output port is defined, redirect the error
// to error output port
if (hasErrorPort) {
sendOutputErrorMsg(Messages.getString("DISCARD_MSG_UNREADABLE")); //$NON-NLS-1$
}
break;
case DISCARD_MESSAGE_MESSAGE_FORMAT_ERROR:
nMessagesDropped.incrementValue(1);
logger.log(LogLevel.WARN,
"DISCARD_MESSAGE_MESSAGE_FORMAT_ERROR"); //$NON-NLS-1$
// If the error output port is defined, redirect the error
// to error output port
if (hasErrorPort) {
sendOutputErrorMsg(Messages.getString("DISCARD_MESSAGE_MESSAGE_FORMAT_ERROR")); //$NON-NLS-1$
}
break;
// the message was read successfully
case SUCCESSFUL_MESSAGE:
if(msgIDAttrIndex != -1 && m.getJMSMessageID() != null) {
dataTuple.setObject(msgIDAttrIndex, new RString(m.getJMSMessageID()));
}
dataOutputPort.submit(dataTuple);
break;
default:
nMessagesDropped.incrementValue(1);
tracer.log(LogLevel.WARN, "No explicit handling for enum " + returnVal.getDeclaringClass().getSimpleName() + " value "
+ returnVal + " implemented in switch statement.", "JMSSource");
logger.log(LogLevel.WARN, "DISCARD_MESSAGE_MESSAGE_VARIABLE", "unexpected return value: " + returnVal); //$NON-NLS-1$
// If the error output port is defined, redirect the error
// to error output port
if (hasErrorPort) {
sendOutputErrorMsg(Messages.getString("DISCARD_MESSAGE_MESSAGE_VARIABLE", "unexpected return value: " + returnVal)); //$NON-NLS-1$
}
}

// set last processed message
if(isInConsistentRegion) {
crState.setLastMsgSent(m);
}

// If the consistent region is driven by operator, then
// 1. increase message counter
// 2. Call make consistent region if message counter reached the triggerCounter specified by user
if(isTriggerOperator) {
crState.increaseMsgCounterByOne();

if(crState.getMsgCounter() == getTriggerCount()){
consistentRegionContext.makeConsistent();
}
}
}

// set last processed message
if(isInConsistentRegion) {
crState.setLastMsgSent(m);
catch (Exception /*UnsupportedEncodingException | JMSException*/ e) {
nMessagesDropped.incrementValue(1);
tracer.log(LogLevel.WARN, "failed to convert JMS message to tuple: " + e.toString(), "JMSSource");
logger.log(LogLevel.WARN, "DISCARD_MESSAGE_MESSAGE_VARIABLE", e.toString()); //$NON-NLS-1$
// If the error output port is defined, redirect the error
// to error output port
if (hasErrorPort) {
sendOutputErrorMsg(Messages.getString("DISCARD_MESSAGE_MESSAGE_VARIABLE", e.toString()));
}
}

// If the consistent region is driven by operator, then
// 1. increase message counter
// 2. Call make consistent region if message counter reached the triggerCounter specified by user
if(isTriggerOperator) {
crState.increaseMsgCounterByOne();

if(crState.getMsgCounter() == getTriggerCount()){
consistentRegionContext.makeConsistent();
}
}

} catch (Exception Ex) {

} finally {
} catch (InterruptedException e) {
// Thread has been interrupted, interrupted state is set; we will leave the while loop - no further action
} catch (IOException | ConnectionException e) {
// problem resetting CR --> throw
// final problem with connection
throw e;
} catch (Exception e) {
// failed to send output tuple
nMessagesDropped.incrementValue(1);
tracer.log (LogLevel.WARN, "failed to submit output tuple: " + e.toString(), "JMSSource");
} finally {
if(consistentRegionContext != null) {
consistentRegionContext.releasePermit();
}
Expand All @@ -882,19 +921,22 @@ protected void process() throws UnsupportedEncodingException,
}

// Send the error message on to the error output port if one is specified
private void sendOutputErrorMsg(String errorMessage) throws Exception {
private void sendOutputErrorMsg(String errorMessage) {
OutputTuple errorTuple = errorOutputPort.newTuple();
String consolidatedErrorMessage = errorMessage;
// set the error message
errorTuple.setString(0, consolidatedErrorMessage);
// submit the tuple.
try {
errorOutputPort.submit(errorTuple);
} catch (Exception e) {
tracer.log (LogLevel.ERROR, "failed to submit error output tuple: " + e.toString(), "JMSSource");
}
}

@Override
public void shutdown() throws Exception {
// close the connection.

if (isAMQ) {
super.shutdown();
jmsConnectionHelper.closeConnection();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
# NLS_MESSAGEFORMAT_VAR
# NLS_ENCODING=UNICODE

INVALID_CONNECTION_DOC=CDIST155OE Invalid connection document.
INVALID_CONNECTION_DOC=CDIST1550E Invalid connection document.
NUMBER_OF_TOPICS_MUST_EQUAL_QOS_ENTRIES=CDIST1551E Number of topics must equal the number of qos entries.
UNABLE_TO_CONNECT_TO_SERVER=CDIST1569E Unable to connect to server.
UNABLE_TO_CONNECT_TO_SERVER_CLIENT_WRAPPER=CDIST1552E [Connect:] Unable to connect to server.
Expand Down
2 changes: 1 addition & 1 deletion com.ibm.streamsx.messaging/messageIdsOverview.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ CDIST1375E Kafka
CDIST1376E Kafka
CDIST1377E Kafka
CDIST1378E Kafka
CDIST1379
CDIST1379W JMS
CDIST1380
CDIST1381
CDIST1382
Expand Down

0 comments on commit c3d87ff

Please sign in to comment.