Skip to content

Commit

Permalink
Merge pull request #96 from conglisc/master
Browse files Browse the repository at this point in the history
Fix for #89, 91, 94
  • Loading branch information
chanskw committed Apr 22, 2015
2 parents 14f39b2 + 580cfe1 commit 9919ece
Show file tree
Hide file tree
Showing 16 changed files with 206 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,8 +126,10 @@ else {

#include "xms.hpp"
#include "MessagingResource.h"
#include <streams_boost/filesystem.hpp>

using namespace std;
using namespace streams_boost;


<%SPL::CodeGen::implementationPrologue($model);%>
Expand Down Expand Up @@ -167,16 +169,57 @@ MY_OPERATOR::MY_OPERATOR() : nTruncatedInserts(0),nTruncatedInsertsPC( getContex
# InitialContext and ConnectionFactory are guaranteed to be there by the XML Schema
my $ic =$conn->getAttributeByName('initial_context');

use URI;

# support relative binding file path.
if(!defined ($ic) || $ic eq '') {
SPL::CodeGen::exitln("A value must be specified for initial_context attribute in connection document.");
}

my $url = new URI($ic);

if($url->scheme eq '') {
SPL::CodeGen::exitln("Invalid initial_context format detected.");
}

if($url->scheme eq 'file') {
my $path = $url->path;

# This is a relative path
if(substr($path, 0, 1) ne '/') {
$ic = $path;
}
# if it ends with a / then append .bindings
if(substr($ic,-1) eq '/') {
$ic .= '.bindings';
}
}

# Append .bindings to the URL, if it is a file based one that refers to a directory. The XMS client requires this
# If it starts with file:/// and it ends with a / then append .bindings
if ((substr($ic,0,8) eq 'file:///') && (substr($ic,-1) eq '/')){
$ic .= '.bindings';
}
#if ((substr($ic,0,8) eq 'file:///') && (substr($ic,-1) eq '/')){
# $ic .= '.bindings';
# }
#
%>

std::string initContext = "<%=$ic %>";

// process relative file path
if(initContext.find("file:") == std::string::npos) {
streams_boost::filesystem::path filePath(initContext);

if(filePath.is_relative()) {
// if relative, convert to absolute path using the application directory as the base
filePath = streams_boost::filesystem::absolute(filePath, getPE().getApplicationDirectory());
initContext = filePath.string();
}

initContext = "file://" + initContext;
}


pInitialContext = new xms::String("<%=$ic %>");
pInitialContext = new xms::String(initContext);
pConnectionFactory = new xms::String("<%=$conn->getAttributeByName('connection_factory'); %>");


Expand Down Expand Up @@ -752,7 +795,7 @@ void MY_OPERATOR::process(Tuple const & tuple, uint32_t port)
logmsg = STREAMS_EXCEPTION(ex.what(),ex.getExplanation());
SPLAPPLOG(L_ERROR, logmsg, "XMSSink");

} catch (exception & ex) {
} catch (std::exception & ex) {
nFailedInserts++;
// Some other exception
ErrMsg<<"Other Exception occurred when sending the message"<<ex.what();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,9 @@ else {
#include <SPL/Runtime/Common/RuntimeException.h>
#include "xms.hpp"
#include "MessagingResource.h"

#include <streams_boost/filesystem.hpp>
using namespace std;

using namespace streams_boost;


<%SPL::CodeGen::implementationPrologue($model);%>
Expand Down Expand Up @@ -179,16 +179,59 @@ MY_OPERATOR::MY_OPERATOR(): nMessagesReadPC( getContext().getMetrics().getCustom
# InitialContext and ConnectionFactory are guaranteed to be there by the XML Schema
my $ic =$conn->getAttributeByName('initial_context');


use URI;

# support relative binding file path.
if(!defined ($ic) || $ic eq '') {
SPL::CodeGen::exitln("A value must be specified for initial_context attribute in connection document.");
}

my $url = new URI($ic);

if($url->scheme eq '') {
SPL::CodeGen::exitln("Invalid initial_context format detected.");
}

if($url->scheme eq 'file') {
my $path = $url->path;

# This is a relative path
if(substr($path, 0, 1) ne '/') {
$ic = $path;
}
# if it ends with a / then append .bindings
if(substr($ic,-1) eq '/') {
$ic .= '.bindings';
}
}

# Append .bindings to the URL, if it is a file based one that refers to a directory. The XMS client requires this

if ((substr($ic,0,8) eq 'file:///') && # If it starts with file:///
(substr($ic,-1) eq '/')) # and it ends with a /
{
$ic .= '.bindings'; # append .bindings
}
#if ((substr($ic,0,8) eq 'file:///') && # If it starts with file:///
#(substr($ic,-1) eq '/')) # and it ends with a /
#{
# $ic .= '.bindings'; # append .bindings
#}

%>
pInitialContext = new xms::String("<%=$ic %>");

std::string initContext = "<%=$ic %>";

// process relative file path
if(initContext.find("file:") == std::string::npos) {
streams_boost::filesystem::path filePath(initContext);

if(filePath.is_relative()) {
// if relative, convert to absolute path using the application directory as the base
filePath = streams_boost::filesystem::absolute(filePath, getPE().getApplicationDirectory());
initContext = filePath.string();
}

initContext = "file://" + initContext;
}

pInitialContext = new xms::String(initContext);
pConnectionFactory = new xms::String("<%=$conn->getAttributeByName('connection_factory'); %>");
<%

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,11 @@
*******************************************************************************/
package com.ibm.streamsx.messaging.jms;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand Down Expand Up @@ -192,11 +195,45 @@ public MessageClass getMessageType() {
return msgClass;

}

// Convert relative provider url path to absolute path for wmq only.
// non-absolute path should be relative to application directory.
// i.e file:./etc/ will be converted to applicationDir + ./etc/
private void convertProviderURLPath(File applicationDir) throws ParseConnectionDocumentException {

if(!isAMQ()) {

// provider_url can not be empty
if(this.providerURL == null || this.providerURL.trim().length() == 0) {
throw new ParseConnectionDocumentException("A value must be specified for provider_url attribute in connection document");
}

// provider_url has a value specified
try {
URL url = new URL(providerURL);

// We only care about url with file scheme.
if("file".equalsIgnoreCase(url.getProtocol())) {
String path = url.getPath();

// relative path is considered being relative to the application directory
if(!path.startsWith("/")) {
URL absProviderURL = new URL(url.getProtocol(), url.getHost(), applicationDir.getAbsolutePath() + File.separator + path);
this.providerURL = absProviderURL.toExternalForm();
}
}

} catch (MalformedURLException e) {
throw new ParseConnectionDocumentException("Invalid provider_url value detected: " + e.getMessage());
}

}
}

// subroutine to parse and validate the connection document
// called by both the JMSSink and JMSSource
public void parseAndValidateConnectionDocument(String connectionDocument, String connection, String access,
StreamSchema streamSchema, boolean isProducer) throws ParseConnectionDocumentException, SAXException,
StreamSchema streamSchema, boolean isProducer, File applicationDir) throws ParseConnectionDocumentException, SAXException,
IOException, ParserConfigurationException {
// validate the connections document against the xsd
validateConnectionsXML(connectionDocument);
Expand All @@ -217,6 +254,8 @@ public void parseAndValidateConnectionDocument(String connectionDocument, String
if (msgClass != MessageClass.empty) {
nativeSchemaChecks(isProducer, streamSchema, nativeSchema);
}
// convert provider_url to absolute if needed
convertProviderURLPath(applicationDir);
}

// subroutine to validate the connections document against the xsd
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ public synchronized void initialize(OperatorContext context)
ConnectionDocumentParser connectionDocumentParser = new ConnectionDocumentParser();

connectionDocumentParser.parseAndValidateConnectionDocument(
getConnectionDocument(), connection, access, streamSchema, true);
getConnectionDocument(), connection, access, streamSchema, true, context.getPE().getApplicationDirectory());

// codepage parameter can come only if message class is bytes
// Since the message class is extracted runtime during the parsing of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ public synchronized void initialize(OperatorContext context)
// the operator throws a runtime error and abort

connectionDocumentParser.parseAndValidateConnectionDocument(
getConnectionDocument(), connection, access, streamSchema, false);
getConnectionDocument(), connection, access, streamSchema, false, context.getPE().getApplicationDirectory());

// codepage parameter can come only if message class is bytes

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.ibm.streams.operator.Type;
import com.ibm.streams.operator.Type.MetaType;
import com.ibm.streams.operator.compile.OperatorContextChecker;
import com.ibm.streams.operator.log4j.LoggerNames;
import com.ibm.streams.operator.log4j.TraceLevel;
import com.ibm.streams.operator.model.Icons;
import com.ibm.streams.operator.model.InputPortSet;
Expand Down Expand Up @@ -79,8 +80,10 @@
@Libraries(value = {"opt/downloaded/*"} )
@Icons(location16="icons/MQTTSink_16.gif", location32="icons/MQTTSink_32.gif")
public class MqttSinkOperator extends AbstractMqttOperator implements StateHandler{

static Logger TRACE = Logger.getLogger(MqttSinkOperator.class);

private static final String CLASS_NAME = "com.ibm.streamsx.messaging.mqtt.MqttSinkOperator";
static Logger TRACE = Logger.getLogger(MqttSinkOperator.class);
static Logger LOGGER = Logger.getLogger(LoggerNames.LOG_FACILITY + "." + CLASS_NAME);

// Parameters
private String topic;
Expand Down Expand Up @@ -122,7 +125,14 @@ public void run() {
String dataAttributeName = getDataAttributeName() == null ? IMqttConstants.MQTT_DEFAULT_DATA_ATTRIBUTE_NAME : getDataAttributeName();

int dataAttrIndex = streamSchema.getAttributeIndex(dataAttributeName);
Type.MetaType dataAttributeType = streamSchema.getAttribute(dataAttributeName).getType().getMetaType();

// if neither dataAttributeName is specified or schema attribute named "data" can be found
// then it is assumed this schema contains only a single attribute and it is the data attribute
if(dataAttrIndex == -1) {
dataAttrIndex = 0;
}

Type.MetaType dataAttributeType = streamSchema.getAttribute(dataAttrIndex).getType().getMetaType();

boolean isBlob = false;
if(dataAttributeType.equals(MetaType.BLOB))
Expand Down Expand Up @@ -311,7 +321,7 @@ public static void checkConsistentRegion(OperatorContextChecker checker) {

// if there is a control port, a warning message is issued as control port is not supported in a consistent region
if(inputPorts.size() > 1) {
TRACE.warn("Having a control port in a consistent region is not supported. The control information may not be replayed, persisted and restored correctly. You may need to manually replay the control signals to bring the operator back to a consistent state.");
LOGGER.warn("Having a control port in a consistent region is not supported. The control information may not be replayed, persisted and restored correctly. You may need to manually replay the control signals to bring the operator back to a consistent state.");
}

if(cContext.isStartOfRegion()) {
Expand Down Expand Up @@ -381,17 +391,25 @@ public static void checkInputPortSchema(OperatorContextChecker checker) {

if (inputPorts.size() > 0)
{
// if user is not specifying dataAttributeName attribute, then we check if stream schema contains default data attribute
// if user is not specifying dataAttributeName attribute
// then we check if stream schema contains default data attribute
// or if schema contains only single attribute
if(!checker.getOperatorContext().getParameterNames().contains("dataAttributeName")) { //$NON-NLS-1$

StreamingInput<Tuple> dataPort = inputPorts.get(0);
StreamSchema streamSchema = dataPort.getStreamSchema();

Attribute data = streamSchema.getAttribute("data");


Attribute dataAttribute = null;
if(streamSchema.getAttributeCount() == 1) {
dataAttribute = streamSchema.getAttribute(0);
}
else {
dataAttribute = streamSchema.getAttribute("data");
}

// the default data attribute must be present and must be either BLOB or RSTRING
if(data != null) {
checker.checkAttributeType(data, MetaType.RSTRING, MetaType.BLOB );
if(dataAttribute != null) {
checker.checkAttributeType(dataAttribute, MetaType.RSTRING, MetaType.BLOB );
}
else {
checker.setInvalidContext(Messages.getString("Error_MqttSinkOperator.5"), new Object[]{}); //$NON-NLS-1$
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,18 @@ public static void checkOutputPort(OperatorContextChecker checker) {
StreamingOutput<OutputTuple> dataPort = outputPorts.get(0);
StreamSchema streamSchema = dataPort.getStreamSchema();

Attribute data = streamSchema.getAttribute("data");
Attribute dataAttribute = null;

if(streamSchema.getAttributeCount() == 1) {
dataAttribute = streamSchema.getAttribute(0);
}
else {
dataAttribute = streamSchema.getAttribute("data");
}

// the default data attribute must be present and must be either BLOB or RSTRING
if(data != null) {
checker.checkAttributeType(data, MetaType.RSTRING, MetaType.BLOB );
if(dataAttribute != null) {
checker.checkAttributeType(dataAttribute, MetaType.RSTRING, MetaType.BLOB );
}
else {
checker.setInvalidContext(Messages.getString("Error_MqttSourceOperator.0"), new Object[]{}); //$NON-NLS-1$
Expand Down Expand Up @@ -549,7 +556,12 @@ private void produceTuples() throws Exception {
String dataAttributeName = this.getDataAttributeName() == null ? IMqttConstants.MQTT_DEFAULT_DATA_ATTRIBUTE_NAME : this.getDataAttributeName();

int dataAttrIndex = streamSchema.getAttributeIndex(dataAttributeName);
Type.MetaType dataAttributeType = streamSchema.getAttribute(dataAttributeName).getType().getMetaType();

if(dataAttrIndex == -1) {
dataAttrIndex = 0;
}

Type.MetaType dataAttributeType = streamSchema.getAttribute(dataAttrIndex).getType().getMetaType();

boolean isBlob = false;
if(dataAttributeType.equals(MetaType.BLOB))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ public class SPLDocConstants {
public static final String PARAM_KEYSTORE_DESC = "This optional parameter of type rstring specifies the file that contains the public and private key certificates of the MQTT client. If a relative path is specified, the path is relative to the application directory.";
public static final String PARAM_TRUSTORE_DESC = "This optional parameter of type rstring specifies the name of the file that contains the public certificate of the trusted MQTT server. If a relative path is specified, the path is relative to the application directory.";

// Common SPL Documnetation
// Common SPL Documnetation
public static final String MQTT_PARAM_CLIENT_ID_DESC = "All clients connected to the same server must have a unique ID. This optional parameter allows user to specify a client id to use when connecting to a MQTT provider. An ID will be generated by the operator if this parameter is not specified.";
public static final String MQTT_PARAM_USER_ID_DESC = "This optional parameter sets the user name to use for the connection. Must be specified when password parameter is used, or compile time error will occur";
public static final String MQTT_PARAM_PASSWORD_DESC = "This optional parameter sets the password to use for the connection. Must be specified when userID parameter is used, or compile time error will occur";
public static final String MQTT_PARAM_COMMAND_TIMEOUT_DESC = "This optional parameter is used to specify maximum time in millisecond to wait for an MQTT action to complete. A MQTT action can include connecting to a server, or publshing to a message. A value of 0 will cause the operator to wait indefinitely for an action to complete. A negative number will cause a runtime error. If unspecified, the default value for this parameter is 0.";
public static final String MQTT_PARAM_KEEP_ALIVE_INTERVAL_DESC = "This optional parameter, measured in seconds, sets the maximum time interval between messages sent or received. It enables the client to detect if the server is no longer available. By default, it is set to 60 seconds. A value of 0 will disable it. Negative number will cause a runtime error.";
public static final String MQTT_PARAM_DATA_ATTRIBUTE_DESC = "This optional parameter specifies the name of the attribute that is used to hold actual content of message, if not specified, default data attribute name is data";
}
public static final String MQTT_PARAM_DATA_ATTRIBUTE_DESC = "This optional parameter specifies the name of the attribute that is used to hold actual content of message, if not specified, in the case where multiple attributes are defined for the streams schema, the operator will look for attribute named data and use it as data attribute. In the case where the schema contains only a signle attribute, the operator will assume that the attribute is the data attribute";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ Error_MqttSinkOperator.2=CDIST1554E Unable to publish message
Error_MqttSinkOperator.0=CDIST1555E Topic or Qos not valid, unable to publish message. Topic: {0} Qos: {1}
Error_MqttSinkOperator.21=CDIST1556E Unable to process the control signal: {0}
Error_MqttSinkOperator.22=CDIST1569E Unable to connect to server.
Error_MqttSinkOperator.5=CDIST1565E Neither the parameter dataAttributeName is provided nor the default data attribute from input port 0 is found in stream schema.
Error_MqttSinkOperator.5=CDIST1565E Data attribute is not found from input port 0 in stream schema, a valid data attribute can be specified with one of following methods: indicate the data attribute name through operator attribute dataAttributeName, name one of the stream schema attribute as "data", in the case where the schema contains only a single attribute, it will be automatically considered the data attribute if its type is either Rstring or Blob.
Error_MqttSinkOperator.7=CDIST1557E At least one of the following attributes must be specified: topic, topicAttributeName
Error_MqttSinkOperator.3=CDIST1578E Timed out waiting for tuples to finish draining.
Error_MqttSourceOperator.0=CDIST1564E Neither the parameter dataAttributeName is provided nor the default data attribute from output port 0 is found in stream schema.
Error_MqttSourceOperator.0=CDIST1564E Data attribute is not found from output port 0 in stream schema, a valid data attribute can be specified with one of following methods: indicate the data attribute name through operator attribute dataAttributeName, name one of the stream schema attribute as "data", in the case where the schema contains only a single attribute, it will be automatically considered the data attribute if its type is either Rstring or Blob.
Error_MqttSourceOperator.24=CDIST1572E Control signal contains invalid qos, the signal is ignored. Topics: {0}, Qos: {1}
Error_MqttSourceOperator.25=CDIST1574E [Request Queue:] MQTT Client Error while handling MQTT client request: {0}
Error_MqttSourceOperator.26=CDIST1575E [Request Queue:] Runtime exception occurred while handling MQTT client requests: {0}
Expand Down
Loading

0 comments on commit 9919ece

Please sign in to comment.