diff --git a/device/transport/mqtt/src/mqtt.ts b/device/transport/mqtt/src/mqtt.ts index edb3f2849..0789cebf9 100644 --- a/device/transport/mqtt/src/mqtt.ts +++ b/device/transport/mqtt/src/mqtt.ts @@ -112,14 +112,14 @@ export class Mqtt extends EventEmitter implements Client.Transport { connect: (callback) => { this._fsm.transition('connecting', callback); }, - sendEvent: (topic, payload, options, sendEventCallback) => { + sendEvent: (message, sendEventCallback) => { /*Codes_SRS_NODE_DEVICE_MQTT_16_023: [The `sendEvent` method shall connect the Mqtt connection if it is disconnected.]*/ this._fsm.handle('connect', (err) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_024: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to establish a connection.]*/ sendEventCallback(translateError(err)); } else { - this._fsm.handle('sendEvent', topic, payload, options, sendEventCallback); + this._fsm.handle('sendEvent', message, sendEventCallback); } }); }, @@ -234,8 +234,41 @@ export class Mqtt extends EventEmitter implements Client.Transport { disconnect: (disconnectCallback) => { this._fsm.transition('disconnecting', disconnectCallback); }, - sendEvent: (topic, payload, options, sendEventCallback) => { - this._mqtt.publish(topic, payload, options, (err, result) => { + sendEvent: (message, sendEventCallback) => { + /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_008: [The `sendEvent` method shall use a topic formatted using the following convention: `devices//messages/events/`.]*/ + let topic = this._topicTelemetryPublish; + let systemProperties: { [key: string]: string } = {}; + /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_011: [The `sendEvent` method shall serialize the `messageId` property of the message as a key-value pair on the topic with the key `$.mid`.]*/ + if (message.messageId) systemProperties['$.mid'] = message.messageId; + /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_012: [The `sendEvent` method shall serialize the `correlationId` property of the message as a key-value pair on the topic with the key `$.cid`.]*/ + if (message.correlationId) systemProperties['$.cid'] = message.correlationId; + /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_013: [The `sendEvent` method shall serialize the `userId` property of the message as a key-value pair on the topic with the key `$.uid`.]*/ + if (message.userId) systemProperties['$.uid'] = message.userId; + /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_014: [The `sendEvent` method shall serialize the `to` property of the message as a key-value pair on the topic with the key `$.to`.]*/ + if (message.to) systemProperties['$.to'] = message.to; + /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_015: [The `sendEvent` method shall serialize the `expiryTimeUtc` property of the message as a key-value pair on the topic with the key `$.exp`.]*/ + if (message.expiryTimeUtc) { + const expiryString = message.expiryTimeUtc instanceof Date ? message.expiryTimeUtc.toISOString() : message.expiryTimeUtc; + systemProperties['$.exp'] = (expiryString || undefined); + } + /*Codes_SRS_NODE_DEVICE_MQTT_16_084: [The `sendEvent` method shall serialize the `contentType` property of the message as a key-value pair on the topic with the key `$.ct`.]*/ + if (message.contentType) systemProperties['$.ct'] = message.contentType; + /*Codes_SRS_NODE_DEVICE_MQTT_16_083: [The `sendEvent` method shall serialize the `contentEncoding` property of the message as a key-value pair on the topic with the key `$.ce`.]*/ + if (message.contentEncoding) systemProperties['$.ce'] = message.contentEncoding; + + const sysPropString = querystring.stringify(systemProperties); + topic += sysPropString; + + /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_009: [If the message has properties, the property keys and values shall be uri-encoded, then serialized and appended at the end of the topic with the following convention: `=&=&=(...)`.]*/ + if (message.properties.count() > 0) { + for (let i = 0; i < message.properties.count(); i++) { + if (i > 0 || sysPropString) topic += '&'; + topic += encodeURIComponent(message.properties.propertyList[i].key) + '=' + encodeURIComponent(message.properties.propertyList[i].value); + } + } + + /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_010: [** The `sendEvent` method shall use QoS level of 1.]*/ + this._mqtt.publish(topic, message.data, { qos: 1, retain: false }, (err, result) => { if (err) { /*Codes_SRS_NODE_DEVICE_MQTT_16_027: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to publish the message.]*/ sendEventCallback(translateError(err)); @@ -351,40 +384,7 @@ export class Mqtt extends EventEmitter implements Client.Transport { sendEvent(message: Message, done?: (err?: Error, result?: any) => void): void { debug('sendEvent ' + JSON.stringify(message)); - /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_008: [The `sendEvent` method shall use a topic formatted using the following convention: `devices//messages/events/`.]*/ - let topic = this._topicTelemetryPublish; - let systemProperties: { [key: string]: string } = {}; - /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_011: [The `sendEvent` method shall serialize the `messageId` property of the message as a key-value pair on the topic with the key `$.mid`.]*/ - if (message.messageId) systemProperties['$.mid'] = message.messageId; - /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_012: [The `sendEvent` method shall serialize the `correlationId` property of the message as a key-value pair on the topic with the key `$.cid`.]*/ - if (message.correlationId) systemProperties['$.cid'] = message.correlationId; - /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_013: [The `sendEvent` method shall serialize the `userId` property of the message as a key-value pair on the topic with the key `$.uid`.]*/ - if (message.userId) systemProperties['$.uid'] = message.userId; - /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_014: [The `sendEvent` method shall serialize the `to` property of the message as a key-value pair on the topic with the key `$.to`.]*/ - if (message.to) systemProperties['$.to'] = message.to; - /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_015: [The `sendEvent` method shall serialize the `expiryTimeUtc` property of the message as a key-value pair on the topic with the key `$.exp`.]*/ - if (message.expiryTimeUtc) { - const expiryString = message.expiryTimeUtc instanceof Date ? message.expiryTimeUtc.toISOString() : message.expiryTimeUtc; - systemProperties['$.exp'] = (expiryString || undefined); - } - /*Codes_SRS_NODE_DEVICE_MQTT_16_084: [The `sendEvent` method shall serialize the `contentType` property of the message as a key-value pair on the topic with the key `$.ct`.]*/ - if (message.contentType) systemProperties['$.ct'] = message.contentType; - /*Codes_SRS_NODE_DEVICE_MQTT_16_083: [The `sendEvent` method shall serialize the `contentEncoding` property of the message as a key-value pair on the topic with the key `$.ce`.]*/ - if (message.contentEncoding) systemProperties['$.ce'] = message.contentEncoding; - - const sysPropString = querystring.stringify(systemProperties); - topic += sysPropString; - - /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_009: [If the message has properties, the property keys and values shall be uri-encoded, then serialized and appended at the end of the topic with the following convention: `=&=&=(...)`.]*/ - if (message.properties.count() > 0) { - for (let i = 0; i < message.properties.count(); i++) { - if (i > 0 || sysPropString) topic += '&'; - topic += encodeURIComponent(message.properties.propertyList[i].key) + '=' + encodeURIComponent(message.properties.propertyList[i].value); - } - } - - /*Codes_SRS_NODE_COMMON_MQTT_BASE_16_010: [** The `sendEvent` method shall use QoS level of 1.]*/ - this._fsm.handle('sendEvent', topic, message.data, { qos: 1, retain: false }, (err, puback) => { + this._fsm.handle('sendEvent', message, (err, puback) => { if (err) { debug('send error: ' + err.toString()); done(err); diff --git a/device/transport/mqtt/test/_mqtt_test.js b/device/transport/mqtt/test/_mqtt_test.js index 107e339cf..8531323a3 100644 --- a/device/transport/mqtt/test/_mqtt_test.js +++ b/device/transport/mqtt/test/_mqtt_test.js @@ -111,6 +111,7 @@ describe('Mqtt', function () { transport.sendEvent(new Message('test'), function () { assert.isTrue(fakeMqttBase.connect.calledOnce); assert.isTrue(fakeMqttBase.publish.calledOnce); + assert.strictEqual(fakeMqttBase.publish.firstCall.args[0], 'devices/deviceId/messages/events/'); testCallback(); }); });