From e2c44b653c331d3233d875e2ee0a46c4443347c3 Mon Sep 17 00:00:00 2001 From: conglisc Date: Wed, 4 Mar 2015 14:44:08 -0500 Subject: [PATCH 1/7] Fix for issue #89 --- .../XMSSink/XMSSink_cpp.cgt | 47 +++++++++++++++--- .../XMSSource/XMSSource_cpp.cgt | 49 ++++++++++++++++--- .../jms/ConnectionDocumentParser.java | 48 +++++++++++++++++- .../ibm/streamsx/messaging/jms/JMSSink.java | 2 +- .../ibm/streamsx/messaging/jms/JMSSource.java | 2 +- 5 files changed, 130 insertions(+), 18 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt index de8ac47..55a791c 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt @@ -126,8 +126,10 @@ else { #include "xms.hpp" #include "MessagingResource.h" +#include using namespace std; +using namespace streams_boost; <%SPL::CodeGen::implementationPrologue($model);%> @@ -167,16 +169,47 @@ MY_OPERATOR::MY_OPERATOR() : nTruncatedInserts(0),nTruncatedInsertsPC( getContex # InitialContext and ConnectionFactory are guaranteed to be there by the XML Schema my $ic =$conn->getAttributeByName('initial_context'); + use URI; + + # support relative binding file path. + if(defined ($ic) && $ic ne '') { + my $url = new URI($ic); + if($url->scheme eq 'file' || $url->scheme eq '') { + $ic = $url->path; + if(substr($ic,-1) eq '/') { + $ic .= '.bindings'; + } + } + } + else { + $ic = './etc/.bindings'; + } + # Append .bindings to the URL, if it is a file based one that refers to a directory. The XMS client requires this # If it starts with file:/// and it ends with a / then append .bindings - if ((substr($ic,0,8) eq 'file:///') && (substr($ic,-1) eq '/')){ - $ic .= '.bindings'; - } - - %> + #if ((substr($ic,0,8) eq 'file:///') && (substr($ic,-1) eq '/')){ + # $ic .= '.bindings'; + # } + # + #%> + + std::string initContext = "<%=$ic %>"; + + // binding file path does not have scheme specified indicating this is a file path + if(initContext.find("://") == std::string::npos) { + streams_boost::filesystem::path filePath(initContext); + if(filePath.is_relative()) { + // if relative, convert to absolute path using the application directory as the base + filePath = streams_boost::filesystem::absolute(filePath, getPE().getApplicationDirectory()); + initContext = filePath.string(); + } + + initContext = "file://" + initContext; + } + - pInitialContext = new xms::String("<%=$ic %>"); + pInitialContext = new xms::String(initContext); pConnectionFactory = new xms::String("<%=$conn->getAttributeByName('connection_factory'); %>"); @@ -752,7 +785,7 @@ void MY_OPERATOR::process(Tuple const & tuple, uint32_t port) logmsg = STREAMS_EXCEPTION(ex.what(),ex.getExplanation()); SPLAPPLOG(L_ERROR, logmsg, "XMSSink"); - } catch (exception & ex) { + } catch (std::exception & ex) { nFailedInserts++; // Some other exception ErrMsg<<"Other Exception occurred when sending the message"< #include "xms.hpp" #include "MessagingResource.h" - +#include using namespace std; - +using namespace streams_boost; <%SPL::CodeGen::implementationPrologue($model);%> @@ -179,16 +179,49 @@ MY_OPERATOR::MY_OPERATOR(): nMessagesReadPC( getContext().getMetrics().getCustom # InitialContext and ConnectionFactory are guaranteed to be there by the XML Schema my $ic =$conn->getAttributeByName('initial_context'); + + use URI; + + # support relative binding file path. + if(defined ($ic) && $ic ne '') { + my $url = new URI($ic); + if($url->scheme eq 'file' || $url->scheme eq '') { + $ic = $url->path; + if(substr($ic,-1) eq '/') { + $ic .= '.bindings'; + } + } + } + else { + $ic = './etc/.bindings'; + } + # Append .bindings to the URL, if it is a file based one that refers to a directory. The XMS client requires this - if ((substr($ic,0,8) eq 'file:///') && # If it starts with file:/// - (substr($ic,-1) eq '/')) # and it ends with a / - { - $ic .= '.bindings'; # append .bindings - } + #if ((substr($ic,0,8) eq 'file:///') && # If it starts with file:/// + #(substr($ic,-1) eq '/')) # and it ends with a / + #{ + # $ic .= '.bindings'; # append .bindings + #} %> - pInitialContext = new xms::String("<%=$ic %>"); + + std::string initContext = "<%=$ic %>"; + + // binding file path does not have scheme specified indicating this is a file path + if(initContext.find("://") == std::string::npos) { + streams_boost::filesystem::path filePath(initContext); + + if(filePath.is_relative()) { + // if relative, convert to absolute path using the application directory as the base + filePath = streams_boost::filesystem::absolute(filePath, getPE().getApplicationDirectory()); + initContext = filePath.string(); + } + + initContext = "file://" + initContext; + } + + pInitialContext = new xms::String(initContext); pConnectionFactory = new xms::String("<%=$conn->getAttributeByName('connection_factory'); %>"); <% diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/ConnectionDocumentParser.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/ConnectionDocumentParser.java index 11c65b9..62da81c 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/ConnectionDocumentParser.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/ConnectionDocumentParser.java @@ -4,8 +4,11 @@ *******************************************************************************/ package com.ibm.streamsx.messaging.jms; +import java.io.File; import java.io.IOException; import java.io.InputStream; +import java.net.MalformedURLException; +import java.net.URL; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -192,11 +195,52 @@ public MessageClass getMessageType() { return msgClass; } + + // Convert relative provider url path to absolute path for wmq only. + // non-absolute path should be relative to application directory. + // i.e file:./etc/ will be converted to applicationDir + ./etc/ + // If provider_url string is not provided + private void convertProviderURLPath(File applicationDir) throws ParseConnectionDocumentException { + + if(!isAMQ()) { + + // default case when provider_url is not specified or left empty + if(this.providerURL == null || this.providerURL.trim().length() == 0) { + this.providerURL = "file://" + applicationDir + "/etc/"; + } + else { // provider_url has a value specified + // provider_url starts with file protocol. + if(this.providerURL.startsWith("file")){ + try { + URL url = new URL(providerURL); + String path = url.getPath(); + + if(!path.startsWith("/")) { + URL absProviderURL = new URL(url.getProtocol(), url.getHost(), applicationDir + File.separator + path); + this.providerURL = absProviderURL.toString(); + } + } catch (MalformedURLException e) { + throw new ParseConnectionDocumentException(e.getMessage()); + } + } + else { // no file protocol specified, convert it to absolute and prefix file protocol + // it is a absolute path + if(this.providerURL.startsWith("/")) { + this.providerURL = "file://" + this.providerURL; + } + else { // relative path is specified + this.providerURL = "file://" + applicationDir + File.separator + this.providerURL; + } + + } + } + } + } // subroutine to parse and validate the connection document // called by both the JMSSink and JMSSource public void parseAndValidateConnectionDocument(String connectionDocument, String connection, String access, - StreamSchema streamSchema, boolean isProducer) throws ParseConnectionDocumentException, SAXException, + StreamSchema streamSchema, boolean isProducer, File applicationDir) throws ParseConnectionDocumentException, SAXException, IOException, ParserConfigurationException { // validate the connections document against the xsd validateConnectionsXML(connectionDocument); @@ -217,6 +261,8 @@ public void parseAndValidateConnectionDocument(String connectionDocument, String if (msgClass != MessageClass.empty) { nativeSchemaChecks(isProducer, streamSchema, nativeSchema); } + // convert provider_url to absolute if needed + convertProviderURLPath(applicationDir); } // subroutine to validate the connections document against the xsd diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java index 3b086dd..7cf322e 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSink.java @@ -395,7 +395,7 @@ public synchronized void initialize(OperatorContext context) ConnectionDocumentParser connectionDocumentParser = new ConnectionDocumentParser(); connectionDocumentParser.parseAndValidateConnectionDocument( - getConnectionDocument(), connection, access, streamSchema, true); + getConnectionDocument(), connection, access, streamSchema, true, context.getPE().getApplicationDirectory()); // codepage parameter can come only if message class is bytes // Since the message class is extracted runtime during the parsing of diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java index ca47aef..bc936c1 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/JMSSource.java @@ -332,7 +332,7 @@ public synchronized void initialize(OperatorContext context) // the operator throws a runtime error and abort connectionDocumentParser.parseAndValidateConnectionDocument( - getConnectionDocument(), connection, access, streamSchema, false); + getConnectionDocument(), connection, access, streamSchema, false, context.getPE().getApplicationDirectory()); // codepage parameter can come only if message class is bytes From 6d96403588f7b0db5be9cd07128e014b0a080a80 Mon Sep 17 00:00:00 2001 From: conglisc Date: Wed, 4 Mar 2015 14:44:51 -0500 Subject: [PATCH 2/7] Fix for issue #94 --- .../messaging/mqtt/MqttSinkOperator.java | 29 ++++++++++++++----- .../messaging/mqtt/MqttSourceOperator.java | 20 ++++++++++--- .../messaging/mqtt/SPLDocConstants.java | 2 +- .../messaging/mqtt/messages.properties | 4 +-- 4 files changed, 41 insertions(+), 14 deletions(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java index 4bc6cb6..31afbc5 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java @@ -122,7 +122,14 @@ public void run() { String dataAttributeName = getDataAttributeName() == null ? IMqttConstants.MQTT_DEFAULT_DATA_ATTRIBUTE_NAME : getDataAttributeName(); int dataAttrIndex = streamSchema.getAttributeIndex(dataAttributeName); - Type.MetaType dataAttributeType = streamSchema.getAttribute(dataAttributeName).getType().getMetaType(); + + // if neither dataAttributeName is specified or schema attribute named "data" can be found + // then it is assumed this schema contains only a single attribute and it is the data attribute + if(dataAttrIndex == -1) { + dataAttrIndex = 0; + } + + Type.MetaType dataAttributeType = streamSchema.getAttribute(dataAttrIndex).getType().getMetaType(); boolean isBlob = false; if(dataAttributeType.equals(MetaType.BLOB)) @@ -381,17 +388,25 @@ public static void checkInputPortSchema(OperatorContextChecker checker) { if (inputPorts.size() > 0) { - // if user is not specifying dataAttributeName attribute, then we check if stream schema contains default data attribute + // if user is not specifying dataAttributeName attribute + // then we check if stream schema contains default data attribute + // or if schema contains only single attribute if(!checker.getOperatorContext().getParameterNames().contains("dataAttributeName")) { //$NON-NLS-1$ StreamingInput dataPort = inputPorts.get(0); StreamSchema streamSchema = dataPort.getStreamSchema(); - - Attribute data = streamSchema.getAttribute("data"); - + + Attribute dataAttribute = null; + if(streamSchema.getAttributeCount() == 1) { + dataAttribute = streamSchema.getAttribute(0); + } + else { + dataAttribute = streamSchema.getAttribute("data"); + } + // the default data attribute must be present and must be either BLOB or RSTRING - if(data != null) { - checker.checkAttributeType(data, MetaType.RSTRING, MetaType.BLOB ); + if(dataAttribute != null) { + checker.checkAttributeType(dataAttribute, MetaType.RSTRING, MetaType.BLOB ); } else { checker.setInvalidContext(Messages.getString("Error_MqttSinkOperator.5"), new Object[]{}); //$NON-NLS-1$ diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSourceOperator.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSourceOperator.java index 185dee4..1629db1 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSourceOperator.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSourceOperator.java @@ -257,11 +257,18 @@ public static void checkOutputPort(OperatorContextChecker checker) { StreamingOutput dataPort = outputPorts.get(0); StreamSchema streamSchema = dataPort.getStreamSchema(); - Attribute data = streamSchema.getAttribute("data"); + Attribute dataAttribute = null; + + if(streamSchema.getAttributeCount() == 1) { + dataAttribute = streamSchema.getAttribute(0); + } + else { + dataAttribute = streamSchema.getAttribute("data"); + } // the default data attribute must be present and must be either BLOB or RSTRING - if(data != null) { - checker.checkAttributeType(data, MetaType.RSTRING, MetaType.BLOB ); + if(dataAttribute != null) { + checker.checkAttributeType(dataAttribute, MetaType.RSTRING, MetaType.BLOB ); } else { checker.setInvalidContext(Messages.getString("Error_MqttSourceOperator.0"), new Object[]{}); //$NON-NLS-1$ @@ -549,7 +556,12 @@ private void produceTuples() throws Exception { String dataAttributeName = this.getDataAttributeName() == null ? IMqttConstants.MQTT_DEFAULT_DATA_ATTRIBUTE_NAME : this.getDataAttributeName(); int dataAttrIndex = streamSchema.getAttributeIndex(dataAttributeName); - Type.MetaType dataAttributeType = streamSchema.getAttribute(dataAttributeName).getType().getMetaType(); + + if(dataAttrIndex == -1) { + dataAttrIndex = 0; + } + + Type.MetaType dataAttributeType = streamSchema.getAttribute(dataAttrIndex).getType().getMetaType(); boolean isBlob = false; if(dataAttributeType.equals(MetaType.BLOB)) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/SPLDocConstants.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/SPLDocConstants.java index 33b2764..d1407af 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/SPLDocConstants.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/SPLDocConstants.java @@ -65,5 +65,5 @@ public class SPLDocConstants { public static final String MQTT_PARAM_PASSWORD_DESC = "This optional parameter sets the password to use for the connection. Must be specified when userID parameter is used, or compile time error will occur"; public static final String MQTT_PARAM_COMMAND_TIMEOUT_DESC = "This optional parameter is used to specify maximum time in millisecond to wait for an MQTT action to complete instead of waiting until a specific action to finish such as message publish action. A value of 0 will wait until the action finishes and not timeout, negative number will cause a runtime error. By default, the operator will not timeout"; public static final String MQTT_PARAM_KEEP_ALIVE_INTERVAL_DESC = "This optional parameter, measured in seconds, sets the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available. By default, it is set to 60 seconds. A value of 0 will disable it. Negative number will cause a runtime error."; - public static final String MQTT_PARAM_DATA_ATTRIBUTE_DESC = "This optional parameter specifies the name of the attribute that is used to hold actual content of message, if not specified, default data attribute name is data"; + public static final String MQTT_PARAM_DATA_ATTRIBUTE_DESC = "This optional parameter specifies the name of the attribute that is used to hold actual content of message, if not specified, in the case where multiple attributes are defined for the streams schema, the operator will look for attribute named data and use it as data attribute. In the case where the schema contains only a signle attribute, the operator will assume that the attribute is the data attribute"; } diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/messages.properties b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/messages.properties index a02907d..ae6fdf9 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/messages.properties +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/messages.properties @@ -14,10 +14,10 @@ Error_MqttSinkOperator.2=CDIST1554E Unable to publish message Error_MqttSinkOperator.0=CDIST1555E Topic or Qos not valid, unable to publish message. Topic: {0} Qos: {1} Error_MqttSinkOperator.21=CDIST1556E Unable to process the control signal: {0} Error_MqttSinkOperator.22=CDIST1569E Unable to connect to server. -Error_MqttSinkOperator.5=CDIST1565E Neither the parameter dataAttributeName is provided nor the default data attribute from input port 0 is found in stream schema. +Error_MqttSinkOperator.5=CDIST1565E Data attribute is not found from input port 0 in stream schema, a valid data attribute can be specified with one of following methods: indicate the data attribute name through operator attribute dataAttributeName, name one of the stream schema attribute as "data", in the case where the schema contains only a single attribute, it will be automatically considered the data attribute if its type is either Rstring or Blob. Error_MqttSinkOperator.7=CDIST1557E At least one of the following attributes must be specified: topic, topicAttributeName Error_MqttSinkOperator.3=CDIST1578E Timed out waiting for tuples to finish draining. -Error_MqttSourceOperator.0=CDIST1564E Neither the parameter dataAttributeName is provided nor the default data attribute from output port 0 is found in stream schema. +Error_MqttSourceOperator.0=CDIST1564E Data attribute is not found from output port 0 in stream schema, a valid data attribute can be specified with one of following methods: indicate the data attribute name through operator attribute dataAttributeName, name one of the stream schema attribute as "data", in the case where the schema contains only a single attribute, it will be automatically considered the data attribute if its type is either Rstring or Blob. Error_MqttSourceOperator.24=CDIST1572E Control signal contains invalid qos, the signal is ignored. Topics: {0}, Qos: {1} Error_MqttSourceOperator.25=CDIST1574E [Request Queue:] MQTT Client Error while handling MQTT client request: {0} Error_MqttSourceOperator.26=CDIST1575E [Request Queue:] Runtime exception occurred while handling MQTT client requests: {0} From 51a1e4f976db39a6e9b8d35f0ef34400328a964d Mon Sep 17 00:00:00 2001 From: conglisc Date: Wed, 4 Mar 2015 15:18:08 -0500 Subject: [PATCH 3/7] Fix for issue #91 --- samples/JMSSample/Makefile | 2 +- samples/JmsWithXmlParse/Makefile | 2 +- samples/JmsWithXmlParseBytes/Makefile | 2 +- samples/KafkaSample/Makefile | 2 +- samples/MqttSample/Makefile | 2 +- samples/XMSSample/Makefile | 2 +- 6 files changed, 6 insertions(+), 6 deletions(-) diff --git a/samples/JMSSample/Makefile b/samples/JMSSample/Makefile index 3fab20c..59a03a4 100644 --- a/samples/JMSSample/Makefile +++ b/samples/JMSSample/Makefile @@ -2,7 +2,7 @@ # others. All Rights Reserved. .PHONY: all clean -STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging +STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} diff --git a/samples/JmsWithXmlParse/Makefile b/samples/JmsWithXmlParse/Makefile index 8460deb..9ace868 100644 --- a/samples/JmsWithXmlParse/Makefile +++ b/samples/JmsWithXmlParse/Makefile @@ -2,7 +2,7 @@ # others. All Rights Reserved. .PHONY: all clean -STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging +STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} diff --git a/samples/JmsWithXmlParseBytes/Makefile b/samples/JmsWithXmlParseBytes/Makefile index eb03000..770b7cb 100644 --- a/samples/JmsWithXmlParseBytes/Makefile +++ b/samples/JmsWithXmlParseBytes/Makefile @@ -2,7 +2,7 @@ # others. All Rights Reserved. .PHONY: all clean -STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging +STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} diff --git a/samples/KafkaSample/Makefile b/samples/KafkaSample/Makefile index 2be3738..ade0f29 100644 --- a/samples/KafkaSample/Makefile +++ b/samples/KafkaSample/Makefile @@ -2,7 +2,7 @@ # others. All Rights Reserved. .PHONY: all clean -STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging +STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} diff --git a/samples/MqttSample/Makefile b/samples/MqttSample/Makefile index cec7596..6ade6ff 100644 --- a/samples/MqttSample/Makefile +++ b/samples/MqttSample/Makefile @@ -2,7 +2,7 @@ # others. All Rights Reserved. .PHONY: all clean -STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging +STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} diff --git a/samples/XMSSample/Makefile b/samples/XMSSample/Makefile index 92413db..4c91830 100644 --- a/samples/XMSSample/Makefile +++ b/samples/XMSSample/Makefile @@ -3,7 +3,7 @@ .PHONY: all clean -STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging +STREAMS_MESSAGING_TOOLKIT ?= ../../com.ibm.streamsx.messaging:${STREAMS_INSTALL}/toolkits/com.ibm.streamsx.messaging SPLC_FLAGS ?= -a -t ${STREAMS_MESSAGING_TOOLKIT} From a017e2baa8157d5f3f1b54320862fb70d2e015bd Mon Sep 17 00:00:00 2001 From: conglisc Date: Mon, 30 Mar 2015 22:33:12 -0400 Subject: [PATCH 4/7] Reworked fix for issue #89 so the provider_url parameter now must take a valid form of URL value, it can be only either absolute (file:///a/b/c) or relative (file:etc/) value, which must include the scheme. --- .../XMSSink/XMSSink_cpp.cgt | 36 +++++++++----- .../XMSSource/XMSSource_cpp.cgt | 34 +++++++++----- .../jms/ConnectionDocumentParser.java | 47 ++++++++----------- 3 files changed, 65 insertions(+), 52 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt index 55a791c..9781959 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt @@ -172,17 +172,27 @@ MY_OPERATOR::MY_OPERATOR() : nTruncatedInserts(0),nTruncatedInsertsPC( getContex use URI; # support relative binding file path. - if(defined ($ic) && $ic ne '') { - my $url = new URI($ic); - if($url->scheme eq 'file' || $url->scheme eq '') { - $ic = $url->path; - if(substr($ic,-1) eq '/') { - $ic .= '.bindings'; - } - } + if(!defined ($ic) || $ic eq '') { + SPL::CodeGen::exitln("A value must be specified for provider_url attribute in connection document."); + } + + my $url = new URI($ic); + + if($url->scheme eq '') { + SPL::CodeGen::exitln("Invalid provider_url format detected."); } - else { - $ic = './etc/.bindings'; + + if($url->scheme eq 'file') { + my $path = $url->path; + + # This is a relative path + if(substr($path, 0, 1) ne '/') { + $ic = $path; + } + # if it ends with a / then append .bindings + if(substr($ic,-1) eq '/') { + $ic .= '.bindings'; + } } # Append .bindings to the URL, if it is a file based one that refers to a directory. The XMS client requires this @@ -191,12 +201,12 @@ MY_OPERATOR::MY_OPERATOR() : nTruncatedInserts(0),nTruncatedInsertsPC( getContex # $ic .= '.bindings'; # } # - #%> + %> std::string initContext = "<%=$ic %>"; - // binding file path does not have scheme specified indicating this is a file path - if(initContext.find("://") == std::string::npos) { + // process relative file path + if(initContext.find("file:") == std::string::npos) { streams_boost::filesystem::path filePath(initContext); if(filePath.is_relative()) { diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt index 1057190..7bfcb30 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt @@ -183,17 +183,27 @@ MY_OPERATOR::MY_OPERATOR(): nMessagesReadPC( getContext().getMetrics().getCustom use URI; # support relative binding file path. - if(defined ($ic) && $ic ne '') { - my $url = new URI($ic); - if($url->scheme eq 'file' || $url->scheme eq '') { - $ic = $url->path; - if(substr($ic,-1) eq '/') { - $ic .= '.bindings'; - } - } + if(!defined ($ic) || $ic eq '') { + SPL::CodeGen::exitln("A value must be specified for provider_url attribute in connection document."); + } + + my $url = new URI($ic); + + if($url->scheme eq '') { + SPL::CodeGen::exitln("Invalid provider_url format detected."); } - else { - $ic = './etc/.bindings'; + + if($url->scheme eq 'file') { + my $path = $url->path; + + # This is a relative path + if(substr($path, 0, 1) ne '/') { + $ic = $path; + } + # if it ends with a / then append .bindings + if(substr($ic,-1) eq '/') { + $ic .= '.bindings'; + } } # Append .bindings to the URL, if it is a file based one that refers to a directory. The XMS client requires this @@ -208,8 +218,8 @@ MY_OPERATOR::MY_OPERATOR(): nMessagesReadPC( getContext().getMetrics().getCustom std::string initContext = "<%=$ic %>"; - // binding file path does not have scheme specified indicating this is a file path - if(initContext.find("://") == std::string::npos) { + // process relative file path + if(initContext.find("file:") == std::string::npos) { streams_boost::filesystem::path filePath(initContext); if(filePath.is_relative()) { diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/ConnectionDocumentParser.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/ConnectionDocumentParser.java index 62da81c..10fe8b0 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/ConnectionDocumentParser.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/jms/ConnectionDocumentParser.java @@ -199,41 +199,34 @@ public MessageClass getMessageType() { // Convert relative provider url path to absolute path for wmq only. // non-absolute path should be relative to application directory. // i.e file:./etc/ will be converted to applicationDir + ./etc/ - // If provider_url string is not provided private void convertProviderURLPath(File applicationDir) throws ParseConnectionDocumentException { if(!isAMQ()) { - // default case when provider_url is not specified or left empty + // provider_url can not be empty if(this.providerURL == null || this.providerURL.trim().length() == 0) { - this.providerURL = "file://" + applicationDir + "/etc/"; + throw new ParseConnectionDocumentException("A value must be specified for provider_url attribute in connection document"); } - else { // provider_url has a value specified - // provider_url starts with file protocol. - if(this.providerURL.startsWith("file")){ - try { - URL url = new URL(providerURL); - String path = url.getPath(); - - if(!path.startsWith("/")) { - URL absProviderURL = new URL(url.getProtocol(), url.getHost(), applicationDir + File.separator + path); - this.providerURL = absProviderURL.toString(); + + // provider_url has a value specified + try { + URL url = new URL(providerURL); + + // We only care about url with file scheme. + if("file".equalsIgnoreCase(url.getProtocol())) { + String path = url.getPath(); + + // relative path is considered being relative to the application directory + if(!path.startsWith("/")) { + URL absProviderURL = new URL(url.getProtocol(), url.getHost(), applicationDir.getAbsolutePath() + File.separator + path); + this.providerURL = absProviderURL.toExternalForm(); } - } catch (MalformedURLException e) { - throw new ParseConnectionDocumentException(e.getMessage()); - } - } - else { // no file protocol specified, convert it to absolute and prefix file protocol - // it is a absolute path - if(this.providerURL.startsWith("/")) { - this.providerURL = "file://" + this.providerURL; - } - else { // relative path is specified - this.providerURL = "file://" + applicationDir + File.separator + this.providerURL; - } - - } + } + + } catch (MalformedURLException e) { + throw new ParseConnectionDocumentException("Invalid provider_url value detected: " + e.getMessage()); } + } } From 35f975462f728280060fbf484606077d51f152e7 Mon Sep 17 00:00:00 2001 From: conglisc Date: Mon, 30 Mar 2015 22:34:15 -0400 Subject: [PATCH 5/7] Fixed issue #97 --- .../ibm/streamsx/messaging/mqtt/MqttSinkOperator.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java index 31afbc5..1755044 100644 --- a/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java +++ b/com.ibm.streamsx.messaging/impl/java/src/com/ibm/streamsx/messaging/mqtt/MqttSinkOperator.java @@ -32,6 +32,7 @@ import com.ibm.streams.operator.Type; import com.ibm.streams.operator.Type.MetaType; import com.ibm.streams.operator.compile.OperatorContextChecker; +import com.ibm.streams.operator.log4j.LoggerNames; import com.ibm.streams.operator.log4j.TraceLevel; import com.ibm.streams.operator.model.Icons; import com.ibm.streams.operator.model.InputPortSet; @@ -79,8 +80,10 @@ @Libraries(value = {"opt/downloaded/*"} ) @Icons(location16="icons/MQTTSink_16.gif", location32="icons/MQTTSink_32.gif") public class MqttSinkOperator extends AbstractMqttOperator implements StateHandler{ - - static Logger TRACE = Logger.getLogger(MqttSinkOperator.class); + + private static final String CLASS_NAME = "com.ibm.streamsx.messaging.mqtt.MqttSinkOperator"; + static Logger TRACE = Logger.getLogger(MqttSinkOperator.class); + static Logger LOGGER = Logger.getLogger(LoggerNames.LOG_FACILITY + "." + CLASS_NAME); // Parameters private String topic; @@ -318,7 +321,7 @@ public static void checkConsistentRegion(OperatorContextChecker checker) { // if there is a control port, a warning message is issued as control port is not supported in a consistent region if(inputPorts.size() > 1) { - TRACE.warn("Having a control port in a consistent region is not supported. The control information may not be replayed, persisted and restored correctly. You may need to manually replay the control signals to bring the operator back to a consistent state."); + LOGGER.warn("Having a control port in a consistent region is not supported. The control information may not be replayed, persisted and restored correctly. You may need to manually replay the control signals to bring the operator back to a consistent state."); } if(cContext.isStartOfRegion()) { From 5b31d57ad567207456b6ddaa86a5ca7f161ef2f4 Mon Sep 17 00:00:00 2001 From: conglisc Date: Mon, 30 Mar 2015 22:35:52 -0400 Subject: [PATCH 6/7] Updated info.xml to reflect changes made on provider_url parameter for JMS and XMS. --- com.ibm.streamsx.messaging/info.xml | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/com.ibm.streamsx.messaging/info.xml b/com.ibm.streamsx.messaging/info.xml index 5461b57..d49a120 100644 --- a/com.ibm.streamsx.messaging/info.xml +++ b/com.ibm.streamsx.messaging/info.xml @@ -222,6 +222,10 @@ The <JMS> element has the following attributes: Use this format if your application sends data to or receive messages from a WebSphere MQ queue or topic. This format specifies that the administered objects are stored in a file called `.bindings`, which is in the `/homes/abc/xyz/wef` directory. + * `file:abc/xyz/wef`: + Use this format if your application sends data to or receive messages from a WebSphere MQ queue or topic. + This format specifies that the administered objects are stored in a file called `.bindings`, + which is in the `abc/xyz/wef` directory relative to the application directory. * `tcp://machinename.com:61616`: Use this format if your application sends data to or receives messages from Apache ActiveMQ. * user @@ -292,11 +296,15 @@ The <XMS> element has four attributes: * initial_context * This mandatory attribute value is a URL that points to the directory service that contains the administered objects. It uses the reference JNDI implementation, which stores objects in the file system. - The syntax of the URL can take two forms: + The syntax of the URL can take four forms: * `file:///xxx/yyy/zzz`: This format specifies that the JNDI objects are stored in a file that is called `zzz`, which is in the directory `xxx/yyy`. + * `file:xxx/yyy/zzz`: + This format specifies that the JNDI objects are stored in a file that is called `zzz`, which is in the directory `xxx/yyy` relative to the application directory. * `file:///xxx/yyy/`: This format specifies that the JNDI objects are stored in a file that is called `.bindings`, which is in the directory `xxx/yyy`. + * `file:xxx/yyy/`: + This format specifies that the JNDI objects are stored in a file that is called `.bindings`, which is in the directory `xxx/yyy` relative to the application directory. By default, the MQExplorer tool puts its entries in a file called `.bindings`. * connection_factory * This mandatory attribute value is the name of the ConnectionFactory administered object. From 580cfe1226fcbfabc04169242e8a6a8010631e83 Mon Sep 17 00:00:00 2001 From: conglisc Date: Tue, 31 Mar 2015 00:00:40 -0400 Subject: [PATCH 7/7] Updated error message for issue #89 --- .../com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt | 4 ++-- .../XMSSource/XMSSource_cpp.cgt | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt index 9781959..8a865c0 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSink/XMSSink_cpp.cgt @@ -173,13 +173,13 @@ MY_OPERATOR::MY_OPERATOR() : nTruncatedInserts(0),nTruncatedInsertsPC( getContex # support relative binding file path. if(!defined ($ic) || $ic eq '') { - SPL::CodeGen::exitln("A value must be specified for provider_url attribute in connection document."); + SPL::CodeGen::exitln("A value must be specified for initial_context attribute in connection document."); } my $url = new URI($ic); if($url->scheme eq '') { - SPL::CodeGen::exitln("Invalid provider_url format detected."); + SPL::CodeGen::exitln("Invalid initial_context format detected."); } if($url->scheme eq 'file') { diff --git a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt index 7bfcb30..d466ff7 100644 --- a/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt +++ b/com.ibm.streamsx.messaging/com.ibm.streamsx.messaging.xms/XMSSource/XMSSource_cpp.cgt @@ -184,13 +184,13 @@ MY_OPERATOR::MY_OPERATOR(): nMessagesReadPC( getContext().getMetrics().getCustom # support relative binding file path. if(!defined ($ic) || $ic eq '') { - SPL::CodeGen::exitln("A value must be specified for provider_url attribute in connection document."); + SPL::CodeGen::exitln("A value must be specified for initial_context attribute in connection document."); } my $url = new URI($ic); if($url->scheme eq '') { - SPL::CodeGen::exitln("Invalid provider_url format detected."); + SPL::CodeGen::exitln("Invalid initial_context format detected."); } if($url->scheme eq 'file') {