diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/common/PropertyProvider.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/common/PropertyProvider.java index 0f0cf05..bc8a24f 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/common/PropertyProvider.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/common/PropertyProvider.java @@ -13,8 +13,8 @@ // This class provides configuration data stored in PE public class PropertyProvider { - private ProcessingElement pe; - private String configurationName; + private ProcessingElement pe = null; + private String configurationName = null; private Map configuration = null; public PropertyProvider(ProcessingElement pe, String configurationName) { diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java index dcf9dbc..22f53be 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSConnectionHelper.java @@ -17,7 +17,6 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageConsumer; -import javax.jms.MessageFormatException; import javax.jms.MessageProducer; import javax.jms.Session; import javax.naming.Context; @@ -25,6 +24,7 @@ import javax.naming.NamingException; import com.ibm.streams.operator.logging.LogLevel; +import com.ibm.streams.operator.logging.TraceLevel; import com.ibm.streams.operator.metrics.Metric; import com.ibm.streamsx.messaging.common.PropertyProvider; @@ -81,7 +81,7 @@ class JMSConnectionHelper implements ExceptionListener { private final boolean useClientAckMode; // JMS message selector - private String messageSelector; + private String messageSelector = null; // Timestamp of session creation private long sessionCreationTime; @@ -97,12 +97,12 @@ class JMSConnectionHelper implements ExceptionListener { private PropertyProvider propertyProvider = null; - private String userPropName; + private String userPropName = null; - private String passwordPropName; + private String passwordPropName = null; // CR queue name - private String destinationCR; + private String destinationCR = null; private ConnectionDocumentParser connectionDocumentParser = null; @@ -226,7 +226,6 @@ private Connection getConnect() { this(connectionDocumentParser, reconnectionPolicy, reconnectionBound, period, isProducer, maxMessageRetry, messageRetryDelay, nReconnectionAttempts, logger, useClientAckMode, msgSelectorCR, propertyProvider, userPropName, passwordPropName, destinationCR); this.nFailedInserts = nFailedInserts; - } @@ -255,8 +254,8 @@ public void createInitialConnectionNoRetry() throws ConnectionException { // this subroutine creates the initial jndi context by taking the mandatory // and optional parameters - private void createAdministeredObjects() - throws NamingException { + private void createAdministeredObjects() throws NamingException { + tracer.log(TraceLevel.TRACE, "Begin createAdministeredObjects()"); //$NON-NLS-1$ // Create a JNDI API InitialContext object if none exists // create a properties object and add all the mandatory and optional @@ -289,14 +288,18 @@ private void createAdministeredObjects() destCR = (Destination) jndiContext.lookup(destinationCR); } + tracer.log(TraceLevel.TRACE, "End createAdministeredObjects()"); //$NON-NLS-1$ return; } // this subroutine creates the connection, it always verifies if we have a // successfull existing connection before attempting to create one. - private synchronized void createConnection() throws ConnectionException, - InterruptedException { + private synchronized void createConnection() throws ConnectionException, InterruptedException { + + tracer.log(TraceLevel.TRACE, "Begin createConnection()"); //$NON-NLS-1$ + int nConnectionAttempts = 0; + // Check if connection exists or not. if (!isConnectValid()) { @@ -350,16 +353,19 @@ else if (reconnectionPolicy == ReconnectionPolicies.BoundedRetry } // sleep for delay interval Thread.sleep(delay); - // Incremet the metric nReconnectionAttempts + // Increment the metric nReconnectionAttempts nReconnectionAttempts.incrementValue(1); } } } + tracer.log(TraceLevel.TRACE, "End createConnection()"); //$NON-NLS-1$ } private synchronized void createConnectionNoRetry() throws ConnectionException { + + tracer.log(TraceLevel.TRACE, "Begin createConnectionNoRetry()"); //$NON-NLS-1$ if (!isConnectValid()) { try { @@ -370,11 +376,14 @@ private synchronized void createConnectionNoRetry() throws ConnectionException { Messages.getString("CONNECTION_TO_JMS_FAILED_NO_RECONNECT_AS_RECONNECT_POLICY_DOES_NOT_APPLY")); //$NON-NLS-1$ } } + + tracer.log(TraceLevel.TRACE, "End createConnectionNoRetry()"); //$NON-NLS-1$ } // this subroutine creates the connection, producer and consumer, throws a // JMSException if it fails private boolean connect(boolean isProducer) throws JMSException { + tracer.log(TraceLevel.TRACE, "Begin connect()"); //$NON-NLS-1$ // Create connection. if (userPrincipal != null && !userPrincipal.isEmpty() && @@ -434,25 +443,37 @@ private boolean connect(boolean isProducer) throws JMSException { getConnect().start(); } tracer.log (LogLevel.INFO, "connection successfully created"); //$NON-NLS-1$ + tracer.log(TraceLevel.TRACE, "End connect()"); //$NON-NLS-1$ + // create connection is successful, return true return true; } private boolean refreshUserCredential() { - + tracer.log(TraceLevel.TRACE, "Begin refreshUserCredential()"); //$NON-NLS-1$ if(propertyProvider == null) { + tracer.log(TraceLevel.TRACE, "End refreshUserCredential() - there is no application configuration"); //$NON-NLS-1$ return false; } String userName = propertyProvider.getProperty(userPropName); String password = propertyProvider.getProperty(passwordPropName, false); + tracer.log(TraceLevel.DEBUG, "User name retrieved from application configuration: " + userName ); //$NON-NLS-1$ + if(password != null && !password.isEmpty()) + tracer.log(TraceLevel.DEBUG, "Password retrieved from application configuration"); //$NON-NLS-1$ + else + tracer.log(TraceLevel.DEBUG, "No password retrieved from application configuration"); //$NON-NLS-1$ + + // TODO: Aren't the following checks kind of redundant? if(this.userPrincipal == userName && this.userCredential == password) { + tracer.log(TraceLevel.TRACE, "End refreshUserCredential() - user credentials unchanged"); //$NON-NLS-1$ return false; } if((this.userPrincipal != null && userName != null && this.userPrincipal.equals(userName)) && (this.userCredential != null && password != null && this.userCredential.equals(password))) { + tracer.log(TraceLevel.TRACE, "End refreshUserCredential() - user credentials unchanged"); //$NON-NLS-1$ return false; } @@ -460,19 +481,20 @@ private boolean refreshUserCredential() { this.userPrincipal = userName; this.userCredential = password; + tracer.log(TraceLevel.TRACE, "End refreshUserCredential()"); //$NON-NLS-1$ return true; } // subroutine which on receiving a message, send it to the // destination,update the metric // nFailedInserts if the send fails + boolean sendMessage(Message message) throws ConnectionException, InterruptedException { - boolean sendMessage(Message message) throws ConnectionException, - InterruptedException { + tracer.log(TraceLevel.TRACE, "Begin sendMessage()"); //$NON-NLS-1$ boolean res = false; int count = 0; - + do { try { @@ -509,13 +531,16 @@ boolean sendMessage(Message message) throws ConnectionException, nFailedInserts.incrementValue(1); } + tracer.log(TraceLevel.TRACE, "End sendMessage()"); //$NON-NLS-1$ return res; } // this subroutine receives messages from a message consumer // This method supports the receive method with timeout - Message receiveMessage(long timeout) throws ConnectionException, InterruptedException, - JMSException { + Message receiveMessage(long timeout) throws ConnectionException, InterruptedException, JMSException { + + tracer.log(TraceLevel.TRACE, "Into receiveMessage()"); //$NON-NLS-1$ + try { // try to receive a message via blocking method synchronized (getSession()) { @@ -548,6 +573,7 @@ Message receiveMessage(long timeout) throws ConnectionException, InterruptedExce // i.e connection problems, this method raise the error back to caller. // No connection or message retry will be attempted. void sendMessageNoRetry(Message message) throws JMSException { + tracer.log(TraceLevel.TRACE, "Begin sendMessageNoRetry()"); //$NON-NLS-1$ try { synchronized (getSession()) { getProducer().send(message); @@ -557,20 +583,26 @@ void sendMessageNoRetry(Message message) throws JMSException { logger.log(LogLevel.WARN, "ERROR_DURING_SEND", new Object[] { e.toString() }); //$NON-NLS-1$ throw e; } + tracer.log(TraceLevel.TRACE, "End sendMessageNoRetry()"); //$NON-NLS-1$ } // send a consistent region message to the consistent region queue void sendCRMessage(Message message) throws JMSException { + + tracer.log(TraceLevel.TRACE, "Begin sendCRMessage()"); //$NON-NLS-1$ synchronized (getSession()) { getProducerCR().send(message); } - + + tracer.log(TraceLevel.TRACE, "End sendCRMessage()"); //$NON-NLS-1$ } // receive a message from consistent region queue Message receiveCRMessage(long timeout) throws JMSException { + tracer.log(TraceLevel.TRACE, "Into receiveCRMessage()"); //$NON-NLS-1$ + synchronized (getSession()) { return (getConsumerCR().receive(timeout)); } @@ -579,6 +611,8 @@ Message receiveCRMessage(long timeout) throws JMSException { // Recovers session causing unacknowledged message to be re-delivered public void recoverSession() throws JMSException, ConnectionException, InterruptedException { + tracer.log(TraceLevel.TRACE, "Begin recoverSession()"); //$NON-NLS-1$ + try { synchronized (getSession()) { tracer.log(LogLevel.INFO, "recoverSession"); //$NON-NLS-1$ @@ -596,25 +630,35 @@ public void recoverSession() throws JMSException, ConnectionException, Interrupt getSession().recover(); } } + + tracer.log(TraceLevel.TRACE, "End recoverSession()"); //$NON-NLS-1$ } public void commitSession() throws JMSException { + tracer.log(TraceLevel.TRACE, "Begin commitSession()"); //$NON-NLS-1$ + synchronized (getSession()) { getSession().commit(); } + + tracer.log(TraceLevel.TRACE, "End commitSession()"); //$NON-NLS-1$ } public void roolbackSession() throws JMSException { + tracer.log(TraceLevel.TRACE, "Begin roolbackSession()"); //$NON-NLS-1$ + synchronized (getSession()) { getSession().rollback(); } + + tracer.log(TraceLevel.TRACE, "End roolbackSession()"); //$NON-NLS-1$ } // close and invalidate the connection public void closeConnection() { - + tracer.log(TraceLevel.TRACE, "Begin closeConnection()"); //$NON-NLS-1$ if (getSession() != null) { try { getSession().close(); @@ -632,5 +676,6 @@ public void closeConnection() { setConnect(null); } } + tracer.log(TraceLevel.TRACE, "End closeConnection()"); //$NON-NLS-1$ } } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java index 929c4ed..c0e6f92 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java @@ -176,30 +176,30 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) { private ConsistentRegionContext consistentRegionContext; // CR queue name for storing checkpoint information - private String consistentRegionQueueName; + private String consistentRegionQueueName = null; // variable to keep track of last successful check point sequeuce id. private long lastSuccessfulCheckpointId = 0; // unique id to identify messages on CR queue - private String operatorUniqueID; + private String operatorUniqueID = null; // application configuration name - private String appConfigName; + private String appConfigName = null; // user property name stored in application configuration - private String userPropName; + private String userPropName = null; // password property name stored in application configuration - private String passwordPropName; + private String passwordPropName = null; - private String keyStore; + private String keyStore = null; - private String trustStore; + private String trustStore = null; - private String keyStorePassword; + private String keyStorePassword = null; - private String trustStorePassword; + private String trustStorePassword = null; private boolean sslConnection; @@ -340,7 +340,7 @@ public void setConnectionDocument(String connectionDocument) { } public String getConnectionDocument() { - + if (connectionDocument == null) { connectionDocument = getOperatorContext().getPE().getApplicationDirectory() + "/etc/connections.xml"; //$NON-NLS-1$ @@ -444,6 +444,9 @@ public static void checkCompileTimeConsistentRegion(OperatorContextChecker check */ @ContextCheck(compile = false) public static void checkParametersRuntime(OperatorContextChecker checker) { + + tracer.log(TraceLevel.TRACE, "Begin checkParametersRuntime()"); //$NON-NLS-1$ + OperatorContext context = checker.getOperatorContext(); if ((context.getParameterNames().contains("reconnectionBound"))) { //$NON-NLS-1$ @@ -536,6 +539,8 @@ public static void checkParametersRuntime(OperatorContextChecker checker) { } } + + tracer.log(TraceLevel.TRACE, "End checkParametersRuntime()"); //$NON-NLS-1$ } // add compile time check for either period or reconnectionBound to be @@ -565,6 +570,9 @@ public synchronized void initialize(OperatorContext context) throws ParserConfigurationException, InterruptedException, IOException, ParseConnectionDocumentException, SAXException, NamingException, ConnectionException, Exception { + + tracer.log(TraceLevel.TRACE, "Begin initialize()"); //$NON-NLS-1$ + super.initialize(context); JmsClasspathUtil.setupClassPaths(context); @@ -694,6 +702,8 @@ public synchronized void initialize(OperatorContext context) // register for data governance registerForDataGovernance(connectionDocumentParser.getProviderURL(), connectionDocumentParser.getDestination()); + + tracer.log(TraceLevel.TRACE, "End initialize()"); //$NON-NLS-1$ } protected String getAbsolutePath(String filePath) { diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index 4914c25..c69036e 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -130,10 +130,10 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) { // UTF-8 private String codepage = "UTF-8"; //$NON-NLS-1$ // This mandatory parameter access specifies access specification name. - private String access; + private String access = null; // This mandatory parameter connection specifies name of the connection // specification containing a JMS element - private String connection; + private String connection = null; // This optional parameter connectionDocument specifies the pathname of a // file containing the connection information. // If present, it must have exactly one value that is a String constant. @@ -183,21 +183,21 @@ public void setnReconnectionAttempts(Metric nReconnectionAttempts) { private Object resetLock = new Object(); // application configuration name - private String appConfigName; + private String appConfigName = null; // user property name stored in application configuration - private String userPropName; + private String userPropName = null; // password property name stored in application configuration - private String passwordPropName; + private String passwordPropName = null; - private String keyStore; + private String keyStore = null; - private String trustStore; + private String trustStore = null; - private String keyStorePassword; + private String keyStorePassword = null; - private String trustStorePassword; + private String trustStorePassword = null; private boolean sslConnection; @@ -478,6 +478,8 @@ public static void checkErrorOutputPort(OperatorContextChecker checker) { */ @ContextCheck(compile = false) public static void checkParametersRuntime(OperatorContextChecker checker) { + + tracer.log(TraceLevel.TRACE, "Begin checkParametersRuntime()"); //$NON-NLS-1$ OperatorContext context = checker.getOperatorContext(); @@ -569,7 +571,8 @@ public static void checkParametersRuntime(OperatorContextChecker checker) { new Object[] {passwordPropName, appConfigName}); } } - + + tracer.log(TraceLevel.TRACE, "End checkParametersRuntime()"); //$NON-NLS-1$ } // add check for reconnectionPolicy is present if either period or @@ -592,6 +595,8 @@ public synchronized void initialize(OperatorContext context) IOException, ParseConnectionDocumentException, SAXException, NamingException, ConnectionException, Exception { + tracer.log(TraceLevel.TRACE, "Begin initialize()"); //$NON-NLS-1$ + super.initialize(context); consistentRegionContext = context.getOptionalContext(ConsistentRegionContext.class); @@ -690,6 +695,7 @@ public synchronized void initialize(OperatorContext context) // register for data governance registerForDataGovernance(connectionDocumentParser.getProviderURL(), connectionDocumentParser.getDestination()); + tracer.log(TraceLevel.TRACE, "End initialize()"); //$NON-NLS-1$ } protected String getAbsolutePath(String filePath) { diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index 3b08938..784e785 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -684,7 +684,7 @@ The <attribute> element has three possible attributes: * composite types * xml - 5.3.9 + 5.3.10 4.2.0.0 diff --git a/com.ibm.streamsx.messaging/pom.xml b/com.ibm.streamsx.messaging/pom.xml index ebe6f0c..733df9a 100644 --- a/com.ibm.streamsx.messaging/pom.xml +++ b/com.ibm.streamsx.messaging/pom.xml @@ -6,7 +6,7 @@ com.ibm.streamsx.messaging streamsx.messaging jar - 1.0.0 + 5.3.10 com.ibm.streamsx.messaging