Skip to content

Commit

Permalink
Mqtt5 final staging (#377)
Browse files Browse the repository at this point in the history
* Mqtt5 support: module, client, bindings, tests, samples, user guide
* Reworked modules internally. Exported module contract should remain equivalent.
  • Loading branch information
bretambrose authored Dec 5, 2022
1 parent 3c17954 commit 735c66a
Show file tree
Hide file tree
Showing 70 changed files with 13,938 additions and 1,902 deletions.
16 changes: 0 additions & 16 deletions .builder/actions/crt-ci-prep.py

This file was deleted.

55 changes: 55 additions & 0 deletions .builder/actions/crt-ci-test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import Builder
import json
import os
import re
import subprocess
import sys
import tempfile

class CrtCiTest(Builder.Action):

def _write_environment_script_secret_to_env(self, env, secret_name):
mqtt5_ci_environment_script = env.shell.get_secret(secret_name)
env_line = re.compile('^export\s+(\w+)=(.+)')

lines = mqtt5_ci_environment_script.splitlines()
for line in lines:
env_pair_match = env_line.match(line)
if env_pair_match.group(1) and env_pair_match.group(2):
env.shell.setenv(env_pair_match.group(1), env_pair_match.group(2))

def _write_secret_to_temp_file(self, env, secret_name):
secret_value = env.shell.get_secret(secret_name)

fd, filename = tempfile.mkstemp()
os.write(fd, str.encode(secret_value))
os.close(fd)

return filename

def run(self, env):
env.shell.setenv("AWS_TESTING_COGNITO_IDENTITY", env.shell.get_secret("aws-c-auth-testing/cognito-identity"))

self._write_environment_script_secret_to_env(env, "mqtt5-testing/github-ci-environment")

# Unfortunately, we can't use NamedTemporaryFile and a with-block because NamedTemporaryFile is not readable
# on Windows.
try:
cert_file_name = self._write_secret_to_temp_file(env, "unit-test/certificate")
key_file_name = self._write_secret_to_temp_file(env, "unit-test/privatekey")

env.shell.setenv("AWS_TEST_MQTT5_IOT_CORE_CERTIFICATE_PATH", cert_file_name)
env.shell.setenv("AWS_TEST_MQTT5_IOT_CORE_KEY_PATH", key_file_name)

if os.system("npm run test:native"):
# Failed
actions.append("exit 1")
finally:
if cert_file_name:
os.remove(cert_file_name)
if key_file_name:
os.remove(key_file_name)

actions = []

return Builder.Script(actions, name='crt-ci-test')
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,8 @@ deps_build/

# docs are updated automatically by .github/workflows/docs.yml
docs/

# samples shouldn't have package-lock committed
samples/browser/**/package-lock.json
samples/node/**/package-lock.json
canary/mqtt5/package-lock.json
400 changes: 400 additions & 0 deletions MQTT5-UserGuide.md

Large diffs are not rendered by default.

7 changes: 1 addition & 6 deletions builder.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,7 @@
"AWS_CRT_MEMORY_TRACING": "2"
},
"test_steps": [
"crt-ci-prep",
[
"npm",
"run",
"test:native"
],
"crt-ci-test",
[
"npm",
"--prefix",
Expand Down
221 changes: 221 additions & 0 deletions canary/mqtt5/canary.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/**
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
* SPDX-License-Identifier: Apache-2.0.
*/

import {ICrtError, mqtt5, mqtt5_packet} from "aws-crt";
import {once} from "events";
import {v4 as uuid} from "uuid";
var weightedRandom = require('weighted-random');

type Args = { [index: string]: any };

const yargs = require('yargs');

yargs.command('*', false, (yargs: any) => {
yargs.option('duration', {
description: 'INT: time in seconds to run the canary',
type: 'number',
default: 3600,
})
}, main).parse();

let RECEIVED_TOPIC : string = "Canary/Received/Topic";

interface CanaryMqttStatistics {
clientsUsed : number;
publishesReceived: number;
subscribesAttempted : number;
subscribesSucceeded : number;
subscribesFailed : number;
unsubscribesAttempted : number;
unsubscribesSucceeded : number;
unsubscribesFailed : number;
publishesAttempted : number;
publishesSucceeded : number;
publishesFailed : number;
}

interface CanaryContext {
client : mqtt5.Mqtt5Client;

mqttStats : CanaryMqttStatistics;

subscriptions: string[];
}

function createCanaryClient(mqttStats : CanaryMqttStatistics) : mqtt5.Mqtt5Client {
const client_config : mqtt5.Mqtt5ClientConfig = {
hostName : process.env.AWS_TEST_MQTT5_DIRECT_MQTT_HOST ?? "localhost",
port : parseInt(process.env.AWS_TEST_MQTT5_DIRECT_MQTT_PORT ?? "0")
};

let client : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config);

client.on('error', (error: ICrtError) => {});
client.on("messageReceived",(message: mqtt5_packet.PublishPacket) : void => {
mqttStats.publishesReceived++;
});

return client;
}

async function doSubscribe(context : CanaryContext) {
let topicFilter: string = `Mqtt5/Canary/RandomSubscribe${uuid()}`;

try {
context.mqttStats.subscribesAttempted++;

await context.client.subscribe({
subscriptions: [
{topicFilter: RECEIVED_TOPIC, qos: mqtt5_packet.QoS.AtLeastOnce}
]
});

context.subscriptions.push(topicFilter);
context.mqttStats.subscribesSucceeded++;
} catch (err) {
context.mqttStats.subscribesFailed++;
context.subscriptions.filter(entry => entry !== topicFilter);
}
}

async function doUnsubscribe(context : CanaryContext) {
if (context.subscriptions.length == 0) {
return;
}

let topicFilter: string = context.subscriptions.pop() ?? "canthappen";

try {
context.mqttStats.unsubscribesAttempted++;

await context.client.unsubscribe({
topicFilters: [ topicFilter ]
});

context.mqttStats.unsubscribesSucceeded++;
} catch (err) {
context.mqttStats.unsubscribesFailed++;
context.subscriptions.push(topicFilter);
}
}

async function doPublish(context : CanaryContext, qos: mqtt5_packet.QoS) {
try {
context.mqttStats.publishesAttempted++;

await context.client.publish({
topicName: RECEIVED_TOPIC,
qos: qos,
payload: Buffer.alloc(10000),
retain: false,
payloadFormat: mqtt5_packet.PayloadFormatIndicator.Utf8,
messageExpiryIntervalSeconds: 60,
responseTopic: "talk/to/me",
correlationData: Buffer.alloc(3000),
contentType: "not-json",
userProperties: [
{name: "name", value: "value"}
]
});

context.mqttStats.publishesSucceeded++;
} catch (err) {
context.mqttStats.publishesFailed++;
}
}

async function runCanaryIteration(endTime: Date, mqttStats : CanaryMqttStatistics) {

let context : CanaryContext = {
client : createCanaryClient(mqttStats),
mqttStats : mqttStats,
subscriptions : []
};

mqttStats.clientsUsed++;

let operationTable = [
{ weight : 1, op: async () => { await doSubscribe(context); }},
{ weight : 1, op: async () => { await doUnsubscribe(context); }},
{ weight : 20, op: async () => { await doPublish(context, mqtt5_packet.QoS.AtMostOnce); }},
{ weight : 20, op: async () => { await doPublish(context, mqtt5_packet.QoS.AtLeastOnce); }}
];

var weightedOperations = operationTable.map(function (operation) {
return operation.weight;
});

const connectionSuccess = once(context.client, "connectionSuccess");

context.client.start();

await connectionSuccess;

await context.client.subscribe({
subscriptions: [
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5_packet.QoS.AtLeastOnce }
]
});

let currentTime : Date = new Date();
while (currentTime.getTime() < endTime.getTime()) {
let index : number = weightedRandom(weightedOperations);

await (operationTable[index].op)();

currentTime = new Date();
}

const stopped = once(context.client, "stopped");

context.client.stop();

await stopped;

context.client.close();
}

async function runCanary(durationInSeconds: number, mqttStats : CanaryMqttStatistics) {
let startTime: Date = new Date();
let currentTime: Date = startTime;
let secondsElapsed : number = 0;
let iteration : number = 0;

while (secondsElapsed < durationInSeconds) {
let iterationTime : number = Math.min(durationInSeconds - secondsElapsed, 60);
let iterationEnd = new Date(currentTime.getTime() + iterationTime * 1000);
await runCanaryIteration(iterationEnd, mqttStats);

iteration++;
console.log(`Iteration ${iteration} stats: ${JSON.stringify(mqttStats)}`);

currentTime = new Date();
secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000;
}
}

async function main(args : Args){
let mqttStats : CanaryMqttStatistics = {
clientsUsed : 0,
publishesReceived: 0,
subscribesAttempted : 0,
subscribesSucceeded : 0,
subscribesFailed : 0,
unsubscribesAttempted : 0,
unsubscribesSucceeded : 0,
unsubscribesFailed : 0,
publishesAttempted : 0,
publishesSucceeded : 0,
publishesFailed : 0
};

await runCanary(args.duration, mqttStats);

console.log(`Final Stats: ${JSON.stringify(mqttStats)}`)

process.exit(0);

}

34 changes: 34 additions & 0 deletions canary/mqtt5/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"name": "canary",
"version": "1.0.0",
"description": "test",
"main": "./dist/canary.js",
"scripts": {
"test": "tsc && node ./dist/canary.js",
"install": "tsc"
},
"repository": {
"type": "git",
"url": "git+https://github.com/awslabs/aws-crt-nodejs.git"
},
"keywords": [
"aws",
"native",
"http"
],
"author": "AWS Common Runtime Team <aws-sdk-common-runtime@amazon.com>",
"license": "Apache-2.0",
"bugs": {
"url": "https://github.com/awslabs/aws-crt-nodejs/issues"
},
"homepage": "https://github.com/awslabs/aws-crt-nodejs#readme",
"devDependencies": {
"@types/node": "^10.17.17",
"typescript": "^3.8.3"
},
"dependencies": {
"aws-crt": "file:../../",
"yargs": "^17.2.1",
"weighted-random": "0.1.0"
}
}
Loading

0 comments on commit 735c66a

Please sign in to comment.