Skip to content

Commit

Permalink
Merge pull request #322 from IBMStreams/develop
Browse files Browse the repository at this point in the history
Pull latest develop changes into master branch
  • Loading branch information
schulz2 authored Oct 5, 2017
2 parents c701cab + 9aeb839 commit 4bf8e17
Show file tree
Hide file tree
Showing 17 changed files with 1,108 additions and 961 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -350,29 +350,13 @@ MY_OPERATOR::MY_OPERATOR(): nMessagesReadPC( getContext().getMetrics().getCustom
pQueueURI = NULL;
pTopicURI = NULL;
pDest = NULL;

iDeliveryMode = XMSC_DELIVERY_NOT_PERSISTENT;

fatalError = false;
connected = false;

iDeliveryMode = XMSC_DELIVERY_NOT_PERSISTENT;
periodVal = <%=$periodVal%>;
//Now attempt to create the XMS objects
if (createAdminObjects() != XMS_OK)
fatalError = true;
else if (createXMS(CONSUMER,<%=$reconnectionPolicy%>,<%=$reconnectionBound%>,(xmsFLOAT)periodVal) == XMS_OK)
{
connected = true;
//In case of initial connection failure, we need to set a flag to avoid reconnection when the MQ is down from the beginning, to distinguish the case for transient conection failure
isInitialConnectionFailure=false;

}
else
{
isInitialConnectionFailure=true;
}
fatalError = createAdminObjects() != XMS_OK;
SPLAPPTRC(L_DEBUG, "Exit: Constructor", "XMSSource");

if (fatalError) throw xms::Exception();
}


Expand All @@ -399,10 +383,7 @@ void MY_OPERATOR::allPortsReady()
{
// Notifies that all ports are ready. No tuples should be submitted before
// this. Source operators can use this method to spawn threads.


createThreads(1); // Create source thread

}


Expand All @@ -418,39 +399,45 @@ void MY_OPERATOR::initialize()
// Processing for source and threaded operators
void MY_OPERATOR::process(uint32_t idx)
{
SPLAPPTRC(L_TRACE, "Entry: process()", "XMSSource");
SPL::rstring logmsg;
ostringstream ErrMsg;
initialize();
SPLAPPTRC(L_TRACE, "Entry: process()", "XMSSource");

if (fatalError == true){

SPLAPPLOG(L_ERROR, MSGTK_PREVIOUS_ERROR, "XMSSource");
<% if (defined $operatorErrorPort ) { %>
ErrMsg<<MSGTK_PREVIOUS_ERROR;
sendOutputErrorMsg(ErrMsg);
<% }%>
}

else{
try
{
consumer.setMessageListener(this);
conn.start();
SPLAPPTRC(L_TRACE, "Message Listener attached", "XMSSource");
getPE().blockUntilShutdownRequest();
if (!getPE().getShutdownRequested()) {
try {
// createXMS(...) can block for a long time dependent on reconnection policy;
// in case of infinite retry even until a shutdown has been requested.
// Thats why we check for shutdown request after return of this method.
xmsINT rc = createXMS(CONSUMER,<%=$reconnectionPolicy%>,<%=$reconnectionBound%>,(xmsFLOAT)periodVal);
if (!getPE().getShutdownRequested()) {
if (rc == XMS_OK) {
connected = true;
conn.setExceptionListener (this);
SPLAPPTRC(L_DEBUG, "Exception Listener attached", "XMSSource");
consumer.setMessageListener(this);
SPLAPPTRC(L_DEBUG, "Message Listener attached", "XMSSource");
conn.start();
SPLAPPTRC(L_DEBUG, "Connection started", "XMSSource");
getPE().blockUntilShutdownRequest();
}
else {
isInitialConnectionFailure=true;
throw xms::Exception();
}
}
}
catch (xms::Exception & ex) {
logmsg = MSGTK_MESSAGE_LISTENER_ERROR(ex.getErrorString().c_str());
SPLAPPLOG(L_ERROR, logmsg, "XMSSource");
<% if (defined $operatorErrorPort ) { %>
ErrMsg<<logmsg;
sendOutputErrorMsg(ErrMsg);
ErrMsg<<logmsg;
sendOutputErrorMsg(ErrMsg);
<% }%>
processException(ex);
throw;
}
SPLAPPTRC(L_TRACE, "Exit: process()", "XMSSource");
}
}
SPLAPPTRC(L_TRACE, "Exit: process()", "XMSSource");
}


Expand All @@ -469,6 +456,78 @@ inline std::string MY_OPERATOR::xms2std(const xms::String & pString)
return std::string(pString.c_str());
}

// ExceptionListener implementation.
xmsVOID MY_OPERATOR::onException (xms::Exception * pExp) {
ostringstream ost;
pExp->dump (ost);
SPLAPPTRC (L_ERROR, "onException: " << ost.str(), "XMSSource");
processException (*pExp);
delete pExp;

// immediate reconnection would fail - sleep for reconnection or initial period
double sleepTime = <%=$initDelayParmValue%> > 0? <%=$initDelayParmValue%>: periodVal;
SPLAPPTRC (L_INFO, "onException: waiting for "<< sleepTime << " seconds before reconnection attempt", "XMSSource");
getPE().blockUntilShutdownRequest (sleepTime);
if (getPE().getShutdownRequested()) {
return;
}

try {
conn.stop();
} catch (xms::Exception & ex) { /*ignore*/ }
try {
conn.close();
} catch (xms::Exception & ex) { /*ignore*/ }
// createXMS can block a longer - time dependent on reconnection policy,
// in case of infinite retry until a shutdown has been requested.
// Thats why we check for shutdown request after return of this method.
xmsINT resultCode = createXMS (CONSUMER,<%=$reconnectionPolicy%>,<%=$reconnectionBound%>,(xmsFLOAT)periodVal);

if (getPE().getShutdownRequested()) {
return;
}

if (resultCode == XMS_OK) {
try {
conn.setExceptionListener (this);
SPLAPPTRC(L_DEBUG, "Exception Listener attached", "XMSSource");
consumer.setMessageListener(this);
SPLAPPTRC(L_DEBUG, "Message Listener attached", "XMSSource");
conn.start();
SPLAPPTRC(L_DEBUG, "Connection started", "XMSSource");
}
catch (xms::Exception & ex) {
SPL::rstring logmsg;
ostringstream ErrMsg;
ex.dump (ErrMsg);
logmsg = MSGTK_MESSAGE_LISTENER_ERROR(ex.getErrorString().c_str());
SPLAPPLOG(L_ERROR, logmsg, "XMSSource");
SPLAPPTRC(L_ERROR, "exception handling failed: " << ErrMsg.str(), "XMSSource");
<% if (defined $operatorErrorPort ) { %>
ErrMsg<<logmsg;
sendOutputErrorMsg(ErrMsg);
<% }%>
processException(ex);
}
}
else {
SPL::rstring logmsg;
ostringstream ErrMsg;
logmsg = MSGTK_PREVIOUS_ERROR;
SPLAPPLOG(L_ERROR, logmsg, "XMSSource");
SPLAPPTRC(L_ERROR, logmsg << ": " << resultCode, "XMSSource");
<% if (defined $operatorErrorPort ) { %>
ErrMsg<<logmsg << ": " << resultCode;
sendOutputErrorMsg(ErrMsg);
<% }%>
// Reconnecting failed. The operator will never again receive data from QM due to an error condition.
// The PE would appear healthy, but this operator would not be functional any more.
// To indicate this serious problem, shutdown the PE by throwing an exception.
throw xms::Exception();
}
}


/*
* SYNOPSIS: Overloaded virtual method for the message listener. This
* method will be called when a message is received.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

<%SPL::CodeGen::headerPrologue($model);%>

class MY_OPERATOR : public MY_BASE_OPERATOR ,public xms::MessageListener {
class MY_OPERATOR : public MY_BASE_OPERATOR ,public xms::MessageListener, public xms::ExceptionListener {

public:

Expand Down Expand Up @@ -40,6 +40,9 @@ public:
*/
virtual xmsVOID onMessage(xms::Message * pMsg);

// Exception listener implementation
virtual xmsVOID onException (xms::Exception * pExp);

// Performance Metrics
SPL::int64 nMessagesRead;
Metric & nMessagesReadPC;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
*******************************************************************************/
package com.ibm.streamsx.messaging.jms;

import com.ibm.streams.operator.metrics.Metric;
import com.ibm.streamsx.messaging.common.PropertyProvider;

import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
Expand All @@ -24,12 +23,15 @@
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import com.ibm.streams.operator.logging.LogLevel;
import com.ibm.streams.operator.metrics.Metric;
import com.ibm.streamsx.messaging.common.PropertyProvider;

/* This class contains all the connection related information, creating maintaining and closing a connection to the JMSProvider
* Sending and Receiving JMS messages
*/
class JMSConnectionHelper {
class JMSConnectionHelper implements ExceptionListener {

// variables required to create connection
// connection factory
Expand Down Expand Up @@ -182,7 +184,8 @@ private Connection getConnect() {

// logger to get error messages
private Logger logger;

private static final Logger tracer = Logger.getLogger(JMSConnectionHelper.class.getName());

// This constructor sets the parameters required to create a connection for
// JMSSource
JMSConnectionHelper(ConnectionDocumentParser connectionDocumentParser,ReconnectionPolicies reconnectionPolicy,
Expand Down Expand Up @@ -226,11 +229,21 @@ private Connection getConnect() {

}

// Method to create the initial connection

/**
* Called asynchronously to notify problems with the connection.
* Here we have no implementation except tracing the problem.
* @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
*/
@Override
public void onException (JMSException ex) {
tracer.log(LogLevel.ERROR, "onException: " + ex.toString()); //$NON-NLS-1$
}

// Method to create the initial connection
public void createInitialConnection() throws ConnectionException,
InterruptedException {
createConnection();
return;
}


Expand Down Expand Up @@ -369,7 +382,8 @@ private boolean connect(boolean isProducer) throws JMSException {
setConnect(connFactory.createConnection(userPrincipal, userCredential));
else
setConnect(connFactory.createConnection());

getConnect().setExceptionListener (this);

// Create session from connection; false means
// session is not transacted.

Expand All @@ -394,6 +408,7 @@ private boolean connect(boolean isProducer) throws JMSException {
getProducerCR().setTimeToLive(TimeUnit.MILLISECONDS.convert(7L, TimeUnit.DAYS));
getProducerCR().setDeliveryMode(DeliveryMode.PERSISTENT);
// start the connection
tracer.log (LogLevel.INFO, "going to start the connection for producer in client acknowledge mode ..."); //$NON-NLS-1$
getConnect().start();
}

Expand All @@ -415,8 +430,10 @@ private boolean connect(boolean isProducer) throws JMSException {
// Its JMSSource, So we will create a consumer
setConsumer(getSession().createConsumer(dest, messageSelector));
// start the connection
tracer.log (LogLevel.INFO, "going to start consumer connection ..."); //$NON-NLS-1$
getConnect().start();
}
tracer.log (LogLevel.INFO, "connection successfully created"); //$NON-NLS-1$
// create connection is successful, return true
return true;
}
Expand Down Expand Up @@ -506,6 +523,7 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce
}
}
catch (JMSException e) {
tracer.log (LogLevel.ERROR, "receiveMessage - " + e.toString()); //$NON-NLS-1$
// If the JMSSource operator was interrupted in middle
if (e.toString().contains("java.lang.InterruptedException")) { //$NON-NLS-1$
throw new java.lang.InterruptedException();
Expand All @@ -523,7 +541,6 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce
synchronized (getSession()) {
return (getConsumer().receive(timeout));
}

}
}

Expand Down Expand Up @@ -583,18 +600,20 @@ public void recoverSession() throws JMSException, ConnectionException, Interrupt

try {
synchronized (getSession()) {
tracer.log(LogLevel.INFO, "recoverSession"); //$NON-NLS-1$
getSession().recover();
tracer.log(LogLevel.INFO, "recoverSession - session recovered"); //$NON-NLS-1$
}
} catch (JMSException e) {

tracer.log(LogLevel.INFO, "attempting to reconnect"); //$NON-NLS-1$
logger.log(LogLevel.INFO, "ATTEMPT_TO_RECONNECT"); //$NON-NLS-1$
setConnect(null);
createConnection();

synchronized (getSession()) {
getSession().recover();
}

}
}

Expand All @@ -612,14 +631,25 @@ public void roolbackSession() throws JMSException {
}
}

// close the connection
public void closeConnection() throws JMSException {
// close and invalidate the connection
public void closeConnection() {

if (getSession() != null) {
getSession().close();
try {
getSession().close();
} catch (JMSException e) {
// ignore
}
}
if (getConnect() != null) {
getConnect().close();
try {
getConnect().close();
} catch (JMSException e) {
// ignore
}
finally {
setConnect(null);
}
}
}
}
Loading

0 comments on commit 4bf8e17

Please sign in to comment.