Skip to content

Commit

Permalink
fix(mqtt): Fix computation of topic when sending a message while disc…
Browse files Browse the repository at this point in the history
…onnected
  • Loading branch information
Pierre Cauchois committed Jun 7, 2018
1 parent dbe365a commit 961c61d
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 38 deletions.
76 changes: 38 additions & 38 deletions device/transport/mqtt/src/mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ export class Mqtt extends EventEmitter implements Client.Transport {
connect: (callback) => {
this._fsm.transition('connecting', callback);
},
sendEvent: (topic, payload, options, sendEventCallback) => {
sendEvent: (message, sendEventCallback) => {
/*Codes_SRS_NODE_DEVICE_MQTT_16_023: [The `sendEvent` method shall connect the Mqtt connection if it is disconnected.]*/
this._fsm.handle('connect', (err) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_024: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to establish a connection.]*/
sendEventCallback(translateError(err));
} else {
this._fsm.handle('sendEvent', topic, payload, options, sendEventCallback);
this._fsm.handle('sendEvent', message, sendEventCallback);
}
});
},
Expand Down Expand Up @@ -234,8 +234,41 @@ export class Mqtt extends EventEmitter implements Client.Transport {
disconnect: (disconnectCallback) => {
this._fsm.transition('disconnecting', disconnectCallback);
},
sendEvent: (topic, payload, options, sendEventCallback) => {
this._mqtt.publish(topic, payload, options, (err, result) => {
sendEvent: (message, sendEventCallback) => {
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_008: [The `sendEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/messages/events/`.]*/
let topic = this._topicTelemetryPublish;
let systemProperties: { [key: string]: string } = {};
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_011: [The `sendEvent` method shall serialize the `messageId` property of the message as a key-value pair on the topic with the key `$.mid`.]*/
if (message.messageId) systemProperties['$.mid'] = message.messageId;
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_012: [The `sendEvent` method shall serialize the `correlationId` property of the message as a key-value pair on the topic with the key `$.cid`.]*/
if (message.correlationId) systemProperties['$.cid'] = message.correlationId;
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_013: [The `sendEvent` method shall serialize the `userId` property of the message as a key-value pair on the topic with the key `$.uid`.]*/
if (message.userId) systemProperties['$.uid'] = message.userId;
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_014: [The `sendEvent` method shall serialize the `to` property of the message as a key-value pair on the topic with the key `$.to`.]*/
if (message.to) systemProperties['$.to'] = message.to;
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_015: [The `sendEvent` method shall serialize the `expiryTimeUtc` property of the message as a key-value pair on the topic with the key `$.exp`.]*/
if (message.expiryTimeUtc) {
const expiryString = message.expiryTimeUtc instanceof Date ? message.expiryTimeUtc.toISOString() : message.expiryTimeUtc;
systemProperties['$.exp'] = (expiryString || undefined);
}
/*Codes_SRS_NODE_DEVICE_MQTT_16_084: [The `sendEvent` method shall serialize the `contentType` property of the message as a key-value pair on the topic with the key `$.ct`.]*/
if (message.contentType) systemProperties['$.ct'] = <string>message.contentType;
/*Codes_SRS_NODE_DEVICE_MQTT_16_083: [The `sendEvent` method shall serialize the `contentEncoding` property of the message as a key-value pair on the topic with the key `$.ce`.]*/
if (message.contentEncoding) systemProperties['$.ce'] = <string>message.contentEncoding;

const sysPropString = querystring.stringify(systemProperties);
topic += sysPropString;

/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_009: [If the message has properties, the property keys and values shall be uri-encoded, then serialized and appended at the end of the topic with the following convention: `<key>=<value>&<key2>=<value2>&<key3>=<value3>(...)`.]*/
if (message.properties.count() > 0) {
for (let i = 0; i < message.properties.count(); i++) {
if (i > 0 || sysPropString) topic += '&';
topic += encodeURIComponent(message.properties.propertyList[i].key) + '=' + encodeURIComponent(message.properties.propertyList[i].value);
}
}

/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_010: [** The `sendEvent` method shall use QoS level of 1.]*/
this._mqtt.publish(topic, message.data, { qos: 1, retain: false }, (err, result) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_027: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to publish the message.]*/
sendEventCallback(translateError(err));
Expand Down Expand Up @@ -351,40 +384,7 @@ export class Mqtt extends EventEmitter implements Client.Transport {
sendEvent(message: Message, done?: (err?: Error, result?: any) => void): void {
debug('sendEvent ' + JSON.stringify(message));

/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_008: [The `sendEvent` method shall use a topic formatted using the following convention: `devices/<deviceId>/messages/events/`.]*/
let topic = this._topicTelemetryPublish;
let systemProperties: { [key: string]: string } = {};
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_011: [The `sendEvent` method shall serialize the `messageId` property of the message as a key-value pair on the topic with the key `$.mid`.]*/
if (message.messageId) systemProperties['$.mid'] = message.messageId;
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_012: [The `sendEvent` method shall serialize the `correlationId` property of the message as a key-value pair on the topic with the key `$.cid`.]*/
if (message.correlationId) systemProperties['$.cid'] = message.correlationId;
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_013: [The `sendEvent` method shall serialize the `userId` property of the message as a key-value pair on the topic with the key `$.uid`.]*/
if (message.userId) systemProperties['$.uid'] = message.userId;
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_014: [The `sendEvent` method shall serialize the `to` property of the message as a key-value pair on the topic with the key `$.to`.]*/
if (message.to) systemProperties['$.to'] = message.to;
/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_015: [The `sendEvent` method shall serialize the `expiryTimeUtc` property of the message as a key-value pair on the topic with the key `$.exp`.]*/
if (message.expiryTimeUtc) {
const expiryString = message.expiryTimeUtc instanceof Date ? message.expiryTimeUtc.toISOString() : message.expiryTimeUtc;
systemProperties['$.exp'] = (expiryString || undefined);
}
/*Codes_SRS_NODE_DEVICE_MQTT_16_084: [The `sendEvent` method shall serialize the `contentType` property of the message as a key-value pair on the topic with the key `$.ct`.]*/
if (message.contentType) systemProperties['$.ct'] = <string>message.contentType;
/*Codes_SRS_NODE_DEVICE_MQTT_16_083: [The `sendEvent` method shall serialize the `contentEncoding` property of the message as a key-value pair on the topic with the key `$.ce`.]*/
if (message.contentEncoding) systemProperties['$.ce'] = <string>message.contentEncoding;

const sysPropString = querystring.stringify(systemProperties);
topic += sysPropString;

/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_009: [If the message has properties, the property keys and values shall be uri-encoded, then serialized and appended at the end of the topic with the following convention: `<key>=<value>&<key2>=<value2>&<key3>=<value3>(...)`.]*/
if (message.properties.count() > 0) {
for (let i = 0; i < message.properties.count(); i++) {
if (i > 0 || sysPropString) topic += '&';
topic += encodeURIComponent(message.properties.propertyList[i].key) + '=' + encodeURIComponent(message.properties.propertyList[i].value);
}
}

/*Codes_SRS_NODE_COMMON_MQTT_BASE_16_010: [** The `sendEvent` method shall use QoS level of 1.]*/
this._fsm.handle('sendEvent', topic, message.data, { qos: 1, retain: false }, (err, puback) => {
this._fsm.handle('sendEvent', message, (err, puback) => {
if (err) {
debug('send error: ' + err.toString());
done(err);
Expand Down
1 change: 1 addition & 0 deletions device/transport/mqtt/test/_mqtt_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ describe('Mqtt', function () {
transport.sendEvent(new Message('test'), function () {
assert.isTrue(fakeMqttBase.connect.calledOnce);
assert.isTrue(fakeMqttBase.publish.calledOnce);
assert.strictEqual(fakeMqttBase.publish.firstCall.args[0], 'devices/deviceId/messages/events/');
testCallback();
});
});
Expand Down

0 comments on commit 961c61d

Please sign in to comment.