Skip to content

Commit

Permalink
#99: add back consistent region support
Browse files Browse the repository at this point in the history
  • Loading branch information
conglisc committed Jul 31, 2015
1 parent 753d152 commit 82bcf75
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,24 @@ If this parameter is not specified, the operator uses the file that is in the de
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>triggerCount</name>
<description>
This optional parameter specifies how many messages are submitted before the JMSSource operator starts to drain the pipeline and establish a consistent state.
This parameter must be greater than zero and must be set if the JMSSource operator is the start operator of an operatorDriven consistent region.</description>
<optional>true</optional>
<type>int32</type>
<cardinality>1</cardinality>
</parameter>
<parameter>
<name>messageSelector</name>
<description>
This optional parameter is used as JMS Message Selector.
</description>
<optional>true</optional>
<type>rstring</type>
<cardinality>1</cardinality>
</parameter>
</parameters>
<inputPorts>
</inputPorts>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import com.ibm.streams.operator.state.ConsistentRegionContext;
import com.ibm.streams.operator.state.StateHandler;


//The JMSSink operator publishes data from Streams to a JMS Provider queue or a topic.

public class JMSSink extends AbstractOperator implements StateHandler{
Expand Down Expand Up @@ -158,6 +159,9 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) {
// Variable to define if the connection attempted to the JMSProvider is the
// first one.
private boolean isInitialConnection = true;

// consistent region context
private ConsistentRegionContext consistentRegionContext;

// Mandatory parameter access
@Parameter(optional = false)
Expand Down Expand Up @@ -373,7 +377,7 @@ public synchronized void initialize(OperatorContext context)

JmsClasspathUtil.setupClassPaths(context);

context.registerStateHandler(this);
consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class);

/*
* Set appropriate variables if the optional error output port is
Expand Down Expand Up @@ -414,7 +418,7 @@ public synchronized void initialize(OperatorContext context)
jmsConnectionHelper = new JMSConnectionHelper(reconnectionPolicy,
reconnectionBound, period, true, maxMessageSendRetries,
messageSendRetryDelay, connectionDocumentParser.getDeliveryMode(),
nReconnectionAttempts, nFailedInserts, logger);
nReconnectionAttempts, nFailedInserts, logger, (consistentRegionContext != null));
jmsConnectionHelper.createAdministeredObjects(
connectionDocumentParser.getInitialContextFactory(),
connectionDocumentParser.getProviderURL(),
Expand Down Expand Up @@ -486,6 +490,7 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
jmsConnectionHelper.createInitialConnection();
isInitialConnection = false;
}

// Construct the JMS message based on the message type taking the
// attributes from the tuple.
Message message = mhandler.convertTupleToMessage(tuple,
Expand All @@ -499,6 +504,8 @@ public void process(StreamingInput<Tuple> stream, Tuple tuple)
"Dropping this tuple since an exception occurred while sending.");
}
}



}

Expand Down Expand Up @@ -539,6 +546,11 @@ public void close() throws IOException {
@Override
public void checkpoint(Checkpoint checkpoint) throws Exception {
logger.log(LogLevel.INFO, "checkpoint " + checkpoint.getSequenceId());

if(jmsConnectionHelper.getSession() != null) {
jmsConnectionHelper.getSession().commit();
}

}

@Override
Expand All @@ -549,11 +561,20 @@ public void drain() throws Exception {
@Override
public void reset(Checkpoint checkpoint) throws Exception {
logger.log(LogLevel.INFO, "Reset to checkpoint " + checkpoint.getSequenceId());

if(jmsConnectionHelper.getSession() != null) {
jmsConnectionHelper.getSession().rollback();
}

}

@Override
public void resetToInitialState() throws Exception {
logger.log(LogLevel.INFO, "Reset to initial state");

if(jmsConnectionHelper.getSession() != null) {
jmsConnectionHelper.getSession().rollback();
}
}

@Override
Expand Down

0 comments on commit 82bcf75

Please sign in to comment.