Skip to content

Commit a91e55a

Browse files
authored
Setup Short Run Mqtt5 Canary (#503)
1 parent b294263 commit a91e55a

File tree

6 files changed

+1868
-109
lines changed

6 files changed

+1868
-109
lines changed

canary/mqtt5/canary.ts

Lines changed: 163 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0.
44
*/
55

6-
import {ICrtError, mqtt5, mqtt5_packet} from "aws-crt";
6+
import {ICrtError, mqtt5} from "aws-crt";
77
import {once} from "events";
88
import {v4 as uuid} from "uuid";
99
var weightedRandom = require('weighted-random');
@@ -13,207 +13,262 @@ type Args = { [index: string]: any };
1313
const yargs = require('yargs');
1414

1515
yargs.command('*', false, (yargs: any) => {
16-
yargs.option('duration', {
17-
description: 'INT: time in seconds to run the canary',
18-
type: 'number',
19-
default: 3600,
20-
})
16+
yargs.option({
17+
'duration': {
18+
description: 'INT: time in seconds to run the canary',
19+
type: 'number',
20+
default: 120,
21+
},
22+
'endpoint': {
23+
description: 'STR: endpoint to connect to',
24+
type: 'string',
25+
default: 'localhost',
26+
},
27+
'port': {
28+
description: 'INT: port to connect to',
29+
type: 'number',
30+
default: 1883,
31+
},
32+
'tps': {
33+
description: 'INT: transaction per second',
34+
type: 'number',
35+
default: 0,
36+
},
37+
'clients': {
38+
description: 'INT: concurrent running clients',
39+
type: 'number',
40+
default: 10,
41+
}
42+
});
2143
}, main).parse();
2244

23-
let RECEIVED_TOPIC : string = "Canary/Received/Topic";
45+
let RECEIVED_TOPIC: string = "Canary/Received/Topic";
2446

2547
interface CanaryMqttStatistics {
26-
clientsUsed : number;
48+
clientsUsed: number;
2749
publishesReceived: number;
28-
subscribesAttempted : number;
29-
subscribesSucceeded : number;
30-
subscribesFailed : number;
31-
unsubscribesAttempted : number;
32-
unsubscribesSucceeded : number;
33-
unsubscribesFailed : number;
34-
publishesAttempted : number;
35-
publishesSucceeded : number;
36-
publishesFailed : number;
50+
subscribesAttempted: number;
51+
subscribesSucceeded: number;
52+
subscribesFailed: number;
53+
unsubscribesAttempted: number;
54+
unsubscribesSucceeded: number;
55+
unsubscribesFailed: number;
56+
publishesAttempted: number;
57+
publishesSucceeded: number;
58+
publishesFailed: number;
59+
totalOperation: number;
60+
}
61+
62+
interface TestContext {
63+
duration: number;
64+
hostname: string;
65+
port: number;
66+
tps_sleep_time: number;
67+
clients: number;
3768
}
3869

3970
interface CanaryContext {
40-
client : mqtt5.Mqtt5Client;
71+
clients: mqtt5.Mqtt5Client[];
72+
73+
mqttStats: CanaryMqttStatistics;
74+
75+
subscriptions: string[][];
76+
}
4177

42-
mqttStats : CanaryMqttStatistics;
78+
function sleep(millisecond: number) {
79+
return new Promise((resolve) => setInterval(resolve, millisecond));
80+
}
4381

44-
subscriptions: string[];
82+
function getRandomIndex(clients : mqtt5.Mqtt5Client[]): number
83+
{
84+
return Math.floor(Math.random() * clients.length);
4585
}
4686

47-
function createCanaryClient(mqttStats : CanaryMqttStatistics) : mqtt5.Mqtt5Client {
48-
const client_config : mqtt5.Mqtt5ClientConfig = {
49-
hostName : process.env.AWS_TEST_MQTT5_DIRECT_MQTT_HOST ?? "localhost",
50-
port : parseInt(process.env.AWS_TEST_MQTT5_DIRECT_MQTT_PORT ?? "0")
87+
function createCanaryClients(testContext: TestContext, mqttStats: CanaryMqttStatistics): mqtt5.Mqtt5Client[] {
88+
const client_config: mqtt5.Mqtt5ClientConfig = {
89+
hostName: testContext.hostname,
90+
port: testContext.port
5191
};
5292

53-
let client : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config);
93+
const clients = [];
5494

55-
client.on('error', (error: ICrtError) => {});
56-
client.on("messageReceived",(message: mqtt5_packet.PublishPacket) : void => {
57-
mqttStats.publishesReceived++;
58-
});
95+
for (let i = 0; i < testContext.clients; i++) {
96+
let client: mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(client_config);
97+
98+
client.on('error', (error: ICrtError) => { });
99+
client.on("messageReceived", (eventData: mqtt5.MessageReceivedEvent): void => {
100+
mqttStats.publishesReceived++;
101+
});
102+
103+
++mqttStats.clientsUsed;
59104

60-
return client;
105+
clients.push(client);
106+
}
107+
108+
return clients;
61109
}
62110

63-
async function doSubscribe(context : CanaryContext) {
111+
async function doSubscribe(context: CanaryContext) {
64112
let topicFilter: string = `Mqtt5/Canary/RandomSubscribe${uuid()}`;
65113

114+
let index = getRandomIndex(context.clients);
66115
try {
67116
context.mqttStats.subscribesAttempted++;
68117

69-
await context.client.subscribe({
118+
await context.clients[index].subscribe({
70119
subscriptions: [
71-
{topicFilter: RECEIVED_TOPIC, qos: mqtt5_packet.QoS.AtLeastOnce}
120+
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce }
72121
]
73122
});
74123

75-
context.subscriptions.push(topicFilter);
76-
context.mqttStats.subscribesSucceeded++;
77124
} catch (err) {
78125
context.mqttStats.subscribesFailed++;
79-
context.subscriptions.filter(entry => entry !== topicFilter);
126+
return;
80127
}
128+
129+
context.subscriptions[index].push(topicFilter);
130+
context.mqttStats.subscribesSucceeded++;
81131
}
82132

83-
async function doUnsubscribe(context : CanaryContext) {
84-
if (context.subscriptions.length == 0) {
133+
async function doUnsubscribe(context: CanaryContext) {
134+
let index = getRandomIndex(context.clients);
135+
if (context.subscriptions[index].length == 0) {
85136
return;
86137
}
87-
88-
let topicFilter: string = context.subscriptions.pop() ?? "canthappen";
138+
let topicFilter: string = context.subscriptions[index].pop() ?? "canthappen";
89139

90140
try {
91141
context.mqttStats.unsubscribesAttempted++;
92142

93-
await context.client.unsubscribe({
94-
topicFilters: [ topicFilter ]
143+
await context.clients[index].unsubscribe({
144+
topicFilters: [topicFilter]
95145
});
96146

97147
context.mqttStats.unsubscribesSucceeded++;
98148
} catch (err) {
99149
context.mqttStats.unsubscribesFailed++;
100-
context.subscriptions.push(topicFilter);
150+
context.subscriptions[index].push(topicFilter);
101151
}
102152
}
103153

104-
async function doPublish(context : CanaryContext, qos: mqtt5_packet.QoS) {
154+
async function doPublish(context: CanaryContext, qos: mqtt5.QoS) {
105155
try {
106156
context.mqttStats.publishesAttempted++;
107157

108-
await context.client.publish({
158+
// Generate random binary payload data
159+
let payload = Buffer.alloc(10000, 'a', "utf-8");
160+
let index = getRandomIndex(context.clients);
161+
await context.clients[index].publish({
109162
topicName: RECEIVED_TOPIC,
110163
qos: qos,
111-
payload: Buffer.alloc(10000),
164+
payload: payload,
112165
retain: false,
113-
payloadFormat: mqtt5_packet.PayloadFormatIndicator.Utf8,
166+
payloadFormat: mqtt5.PayloadFormatIndicator.Utf8,
114167
messageExpiryIntervalSeconds: 60,
115168
responseTopic: "talk/to/me",
116169
correlationData: Buffer.alloc(3000),
117170
contentType: "not-json",
118171
userProperties: [
119-
{name: "name", value: "value"}
172+
{ name: "name", value: "value" }
120173
]
121174
});
122175

123176
context.mqttStats.publishesSucceeded++;
124177
} catch (err) {
125178
context.mqttStats.publishesFailed++;
179+
console.log("Publish Failed with " + err);
126180
}
127181
}
128182

129-
async function runCanaryIteration(endTime: Date, mqttStats : CanaryMqttStatistics) {
183+
async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatistics) {
184+
let startTime: Date = new Date();
185+
let currentTime: Date = startTime;
186+
let secondsElapsed: number = 0;
130187

131-
let context : CanaryContext = {
132-
client : createCanaryClient(mqttStats),
133-
mqttStats : mqttStats,
134-
subscriptions : []
188+
let context: CanaryContext = {
189+
clients: createCanaryClients(testContext, mqttStats),
190+
mqttStats: mqttStats,
191+
subscriptions: []
135192
};
136193

137-
mqttStats.clientsUsed++;
194+
// Start clients
195+
context.clients.forEach( async client => {
196+
client.start();
197+
const connectionSuccess = once(client, "connectionSuccess");
198+
199+
await connectionSuccess;
200+
201+
await client.subscribe({
202+
subscriptions: [
203+
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5.QoS.AtLeastOnce }
204+
]
205+
});
206+
// setup empty subscription string array
207+
context.subscriptions.push(new Array());
208+
});
138209

139210
let operationTable = [
140211
{ weight : 1, op: async () => { await doSubscribe(context); }},
141212
{ weight : 1, op: async () => { await doUnsubscribe(context); }},
142-
{ weight : 20, op: async () => { await doPublish(context, mqtt5_packet.QoS.AtMostOnce); }},
143-
{ weight : 20, op: async () => { await doPublish(context, mqtt5_packet.QoS.AtLeastOnce); }}
213+
{ weight : 20, op: async () => { await doPublish(context, mqtt5.QoS.AtMostOnce); }},
214+
{ weight : 20, op: async () => { await doPublish(context, mqtt5.QoS.AtLeastOnce); }}
144215
];
145216

146217
var weightedOperations = operationTable.map(function (operation) {
147218
return operation.weight;
148219
});
149220

150-
const connectionSuccess = once(context.client, "connectionSuccess");
151-
152-
context.client.start();
153-
154-
await connectionSuccess;
221+
while (secondsElapsed < testContext.duration) {
155222

156-
await context.client.subscribe({
157-
subscriptions: [
158-
{ topicFilter: RECEIVED_TOPIC, qos: mqtt5_packet.QoS.AtLeastOnce }
159-
]
160-
});
161-
162-
let currentTime : Date = new Date();
163-
while (currentTime.getTime() < endTime.getTime()) {
164-
let index : number = weightedRandom(weightedOperations);
223+
let index: number = weightedRandom(weightedOperations);
165224

166225
await (operationTable[index].op)();
167-
226+
++context.mqttStats.totalOperation;
227+
await sleep(testContext.tps_sleep_time);
168228
currentTime = new Date();
169-
}
170-
171-
const stopped = once(context.client, "stopped");
172-
173-
context.client.stop();
174-
175-
await stopped;
176-
177-
context.client.close();
178-
}
179229

180-
async function runCanary(durationInSeconds: number, mqttStats : CanaryMqttStatistics) {
181-
let startTime: Date = new Date();
182-
let currentTime: Date = startTime;
183-
let secondsElapsed : number = 0;
184-
let iteration : number = 0;
230+
secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000;
231+
}
185232

186-
while (secondsElapsed < durationInSeconds) {
187-
let iterationTime : number = Math.min(durationInSeconds - secondsElapsed, 60);
188-
let iterationEnd = new Date(currentTime.getTime() + iterationTime * 1000);
189-
await runCanaryIteration(iterationEnd, mqttStats);
190233

191-
iteration++;
192-
console.log(`Iteration ${iteration} stats: ${JSON.stringify(mqttStats)}`);
234+
// Stop and close clients
235+
context.clients.forEach( async client => {
236+
const stopped = once(client, "stopped");
237+
client.stop();
238+
await stopped;
239+
client.close();
240+
});
193241

194-
currentTime = new Date();
195-
secondsElapsed = (currentTime.getTime() - startTime.getTime()) / 1000;
196-
}
197242
}
198243

199244
async function main(args : Args){
200-
let mqttStats : CanaryMqttStatistics = {
201-
clientsUsed : 0,
245+
let mqttStats : CanaryMqttStatistics = {
246+
clientsUsed: 0,
202247
publishesReceived: 0,
203-
subscribesAttempted : 0,
204-
subscribesSucceeded : 0,
205-
subscribesFailed : 0,
206-
unsubscribesAttempted : 0,
207-
unsubscribesSucceeded : 0,
208-
unsubscribesFailed : 0,
209-
publishesAttempted : 0,
210-
publishesSucceeded : 0,
211-
publishesFailed : 0
248+
subscribesAttempted: 0,
249+
subscribesSucceeded: 0,
250+
subscribesFailed: 0,
251+
unsubscribesAttempted: 0,
252+
unsubscribesSucceeded: 0,
253+
unsubscribesFailed: 0,
254+
publishesAttempted: 0,
255+
publishesSucceeded: 0,
256+
publishesFailed: 0,
257+
totalOperation: 0,
212258
};
213259

214-
await runCanary(args.duration, mqttStats);
260+
let testContext: TestContext = {
261+
duration: args.duration,
262+
hostname: args.endpoint,
263+
port: args.port,
264+
tps_sleep_time: args.tps == 0 ? 0 : (1000 / args.tps),
265+
clients: args.clients,
266+
}
267+
268+
await runCanary(testContext, mqttStats);
215269

216-
console.log(`Final Stats: ${JSON.stringify(mqttStats)}`)
270+
console.log(`Final Stats: ${JSON.stringify(mqttStats)}`);
271+
console.log(`Operation TPS: ${mqttStats.totalOperation / testContext.duration}`);
217272

218273
process.exit(0);
219274

canary/mqtt5/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
"homepage": "https://github.com/awslabs/aws-crt-nodejs#readme",
2525
"devDependencies": {
2626
"@types/node": "^10.17.17",
27-
"typescript": "^3.8.3"
27+
"typescript": "^4.7.4"
2828
},
2929
"dependencies": {
3030
"aws-crt": "file:../../",

0 commit comments

Comments
 (0)