Skip to content

Commit

Permalink
Merge pull request #111 from IBMStreams/revert-110-master
Browse files Browse the repository at this point in the history
Revert "JMS Operator Consistent Region Initial Update."
  • Loading branch information
chanskw committed Jul 16, 2015
2 parents c0b2dd6 + 54c7b9a commit 48244fc
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -265,24 +265,6 @@ If this parameter is not specified, the operator uses the file that is in the de
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>triggerCount</name>
<description>
This optional parameter specifies how many messages are submitted before the JMSSource operator starts to drain the pipeline and establish a consistent state.
This parameter must be greater than zero and must be set if the JMSSource operator is the start operator of an operatorDriven consistent region.</description>
<optional>true</optional>
<type>int32</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>messageSelector</name>
<description>
This optional parameter is used as JMS Message Selector.
</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
</parameters>
<inputPorts>
</inputPorts>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.InvalidSelectorException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
Expand Down Expand Up @@ -71,10 +70,6 @@ class JMSConnectionHelper {

// Time to wait before try to resend failed message
private final long messageRetryDelay;

private final boolean useClientAckMode;

private String messageSelector;

// procedure to detrmine if there exists a valid connection or not
private boolean isConnectValid() {
Expand Down Expand Up @@ -133,7 +128,7 @@ private Connection getConnect() {
JMSConnectionHelper(ReconnectionPolicies reconnectionPolicy,
int reconnectionBound, double period, boolean isProducer,
int maxMessageRetry, long messageRetryDelay,
String deliveryMode, Metric nReconnectionAttempts, Logger logger, boolean useClientAckMode, String messageSelector) {
String deliveryMode, Metric nReconnectionAttempts, Logger logger) {
this.reconnectionPolicy = reconnectionPolicy;
this.reconnectionBound = reconnectionBound;
this.period = period;
Expand All @@ -143,18 +138,16 @@ private Connection getConnect() {
this.nReconnectionAttempts = nReconnectionAttempts;
this.maxMessageRetries = maxMessageRetry;
this.messageRetryDelay = messageRetryDelay;
this.useClientAckMode = useClientAckMode;
this.messageSelector = messageSelector;
}

// This constructor sets the parameters required to create a connection for
// JMSSink
JMSConnectionHelper(ReconnectionPolicies reconnectionPolicy,
int reconnectionBound, double period, boolean isProducer,
int maxMessageRetry, long messageRetryDelay, String deliveryMode,
Metric nReconnectionAttempts, Metric nFailedInserts, Logger logger, boolean useClientAckMode) {
Metric nReconnectionAttempts, Metric nFailedInserts, Logger logger) {
this(reconnectionPolicy, reconnectionBound, period, isProducer,
maxMessageRetry, messageRetryDelay, deliveryMode, nReconnectionAttempts, logger, useClientAckMode, null);
maxMessageRetry, messageRetryDelay, deliveryMode, nReconnectionAttempts, logger);
this.nFailedInserts = nFailedInserts;

}
Expand Down Expand Up @@ -230,9 +223,6 @@ private synchronized void createConnection() throws ConnectionException,
break;
}

} catch (InvalidSelectorException e) {
throw new ConnectionException(
"Connection to JMS failed. Invalid message selector");
} catch (JMSException e) {
logger.log(LogLevel.ERROR, "RECONNECTION_EXCEPTION",
new Object[] { e.toString() });
Expand Down Expand Up @@ -282,14 +272,7 @@ private boolean connect(boolean isProducer) throws JMSException {

// Create session from connection; false means
// session is not transacted.

if(isProducer) {
setSession(getConnect().createSession(this.useClientAckMode, Session.AUTO_ACKNOWLEDGE));
}
else {
setSession(getConnect().createSession(false, (this.useClientAckMode) ? Session.CLIENT_ACKNOWLEDGE : Session.AUTO_ACKNOWLEDGE));
}

setSession(getConnect().createSession(false, Session.AUTO_ACKNOWLEDGE));

if (isProducer == true) {
// Its JMSSink, So we will create a producer
Expand All @@ -311,7 +294,7 @@ private boolean connect(boolean isProducer) throws JMSException {

} else {
// Its JMSSource, So we will create a consumer
setConsumer(getSession().createConsumer(dest, messageSelector));
setConsumer(getSession().createConsumer(dest));
// start the connection
getConnect().start();
}
Expand Down Expand Up @@ -369,12 +352,12 @@ boolean sendMessage(Message message) throws ConnectionException,
}

// this subroutine receives messages from a message consumer
Message receiveMessage(long timeout) throws ConnectionException, InterruptedException,
Message receiveMessage() throws ConnectionException, InterruptedException,
JMSException {
try {
// try to receive a message
synchronized (getSession()) {
return (getConsumer().receive(timeout));
return (getConsumer().receive());
}

}
Expand All @@ -397,16 +380,6 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce
}
}
}

public void recoverSession() throws JMSException {

if(getSession() != null) {
synchronized (getSession()) {
getSession().recover();
}
}

}

// close the connection
public void closeConnection() throws JMSException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.state.StateHandler;


//The JMSSink operator publishes data from Streams to a JMS Provider queue or a topic.

public class JMSSink extends AbstractOperator implements StateHandler{
Expand Down Expand Up @@ -159,9 +158,6 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) {
// Variable to define if the connection attempted to the JMSProvider is the
// first one.
private boolean isInitialConnection = true;

// consistent region context
private ConsistentRegionContext consistentRegionContext;

// Mandatory parameter access
@Parameter(optional = false)
Expand Down Expand Up @@ -377,7 +373,7 @@ public synchronized void initialize(OperatorContext context)

JmsClasspathUtil.setupClassPaths(context);

consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class);
context.registerStateHandler(this);

/*
* Set appropriate variables if the optional error output port is
Expand Down Expand Up @@ -418,7 +414,7 @@ public synchronized void initialize(OperatorContext context)
jmsConnectionHelper = new JMSConnectionHelper(reconnectionPolicy,
reconnectionBound, period, true, maxMessageSendRetries,
messageSendRetryDelay, connectionDocumentParser.getDeliveryMode(),
nReconnectionAttempts, nFailedInserts, logger, (consistentRegionContext != null));
nReconnectionAttempts, nFailedInserts, logger);
jmsConnectionHelper.createAdministeredObjects(
connectionDocumentParser.getInitialContextFactory(),
connectionDocumentParser.getProviderURL(),
Expand Down Expand Up @@ -490,7 +486,6 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
jmsConnectionHelper.createInitialConnection();
isInitialConnection = false;
}

// Construct the JMS message based on the message type taking the
// attributes from the tuple.
Message message = mhandler.convertTupleToMessage(tuple,
Expand All @@ -504,8 +499,6 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
"Dropping this tuple since an exception occurred while sending.");
}
}



}

Expand Down Expand Up @@ -546,11 +539,6 @@ public void close() throws IOException {
@Override
public void checkpoint(Checkpoint checkpoint) throws Exception {
logger.log(LogLevel.INFO, "checkpoint " + checkpoint.getSequenceId());

if(jmsConnectionHelper.getSession() != null) {
jmsConnectionHelper.getSession().commit();
}

}

@Override
Expand All @@ -561,20 +549,11 @@ public void drain() throws Exception {
@Override
public void reset(Checkpoint checkpoint) throws Exception {
logger.log(LogLevel.INFO, "Reset to checkpoint " + checkpoint.getSequenceId());

if(jmsConnectionHelper.getSession() != null) {
jmsConnectionHelper.getSession().rollback();
}

}

@Override
public void resetToInitialState() throws Exception {
logger.log(LogLevel.INFO, "Reset to initial state");

if(jmsConnectionHelper.getSession() != null) {
jmsConnectionHelper.getSession().rollback();
}
}

@Override
Expand Down
Loading

0 comments on commit 48244fc

Please sign in to comment.