Skip to content

Commit

Permalink
Merge branch 'main' into Mqtt5GA
Browse files Browse the repository at this point in the history
  • Loading branch information
bretambrose authored Dec 4, 2023
2 parents 987d062 + 2559252 commit 05c3c6a
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 11 deletions.
13 changes: 6 additions & 7 deletions canary/mqtt5/canary.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ interface CanaryContext {
}

function sleep(millisecond: number) {
return new Promise((resolve) => setInterval(resolve, millisecond));
return new Promise((resolve) => setTimeout(resolve, millisecond));
}

function getRandomIndex(clients : mqtt5.Mqtt5Client[]): number
Expand Down Expand Up @@ -192,7 +192,7 @@ async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatisti
};

// Start clients
context.clients.forEach( async client => {
for (let client of context.clients) {
client.start();
const connectionSuccess = once(client, "connectionSuccess");

Expand All @@ -205,7 +205,7 @@ async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatisti
});
// setup empty subscription string array
context.subscriptions.push(new Array());
});
}

let operationTable = [
{ weight : 1, op: async () => { await doSubscribe(context); }},
Expand All @@ -214,7 +214,7 @@ async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatisti
{ weight : 20, op: async () => { await doPublish(context, mqtt5.QoS.AtLeastOnce); }}
];

var weightedOperations = operationTable.map(function (operation) {
let weightedOperations = operationTable.map(function (operation) {
return operation.weight;
});

Expand All @@ -232,13 +232,12 @@ async function runCanary(testContext: TestContext, mqttStats: CanaryMqttStatisti


// Stop and close clients
context.clients.forEach( async client => {
for (let client of context.clients) {
const stopped = once(client, "stopped");
client.stop();
await stopped;
client.close();
});

}
}

async function main(args : Args){
Expand Down
8 changes: 8 additions & 0 deletions lib/native/mqtt5.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,14 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir
expect(willReceived).toEqual(true);
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Shared subscriptions test', async () => {
const config : mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
const publisher : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
const subscriber1 : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
const subscriber2 : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
await test_utils.doSharedSubscriptionsTest(publisher, subscriber1, subscriber2);
});

test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Operation failure - null subscribe', async () => {
await test_utils.nullSubscribeTest(new mqtt5.Mqtt5Client(createDirectIotCoreClientConfig()));
});
Expand Down
9 changes: 5 additions & 4 deletions source/module.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ AWS_STATIC_ASSERT(NAPI_VERSION >= 4);

/* TODO:
* Hardcoded enum value for `napi_no_external_buffers_allowed`.
* The enum `napi_no_external_buffers_allowed` is introduced in node14.
* Use it for external buffer related changes after bump to node 14 */
* The enum `napi_no_external_buffers_allowed` is introduced in node21 and backport
* to node 14.21.2, 16.19.0, 18.13.0.
* Use `napi_no_external_buffers_allowed` for external buffer related changes after bump to node 21 */
#define NAPI_NO_EXTERNAL_BUFFER_ENUM_VALUE 22

static bool s_tsfn_enabled = false;
Expand Down Expand Up @@ -873,7 +874,7 @@ const char *aws_napi_status_to_str(napi_status status) {
case napi_bigint_expected:
reason = "napi_bigint_expected";
break;
case napi_no_external_buffers_allowed:
case NAPI_NO_EXTERNAL_BUFFER_ENUM_VALUE:
reason = "napi_no_external_buffers_allowed";
break;
}
Expand Down Expand Up @@ -976,7 +977,7 @@ napi_status aws_napi_create_external_arraybuffer(
napi_status external_buffer_status =
napi_create_external_arraybuffer(env, external_data, byte_length, finalize_cb, finalize_hint, result);

if (external_buffer_status == napi_no_external_buffers_allowed) {
if (external_buffer_status == NAPI_NO_EXTERNAL_BUFFER_ENUM_VALUE) {

// The external buffer is disabled, manually copy the external_data into Node
void *napi_buf_data = NULL;
Expand Down
96 changes: 96 additions & 0 deletions test/mqtt5.ts
Original file line number Diff line number Diff line change
Expand Up @@ -576,3 +576,99 @@ export async function doRetainTest(client1: mqtt5.Mqtt5Client, client2: mqtt5.Mq
await stopped1;
client1.close();
}

export async function doSharedSubscriptionsTest(publisher: mqtt5.Mqtt5Client, subscriber1: mqtt5.Mqtt5Client, subscriber2: mqtt5.Mqtt5Client) {
const payload : Buffer = Buffer.from("share", "utf-8");
const messagesNumber: number = 10;
const testTopic: string = `mqtt5_test${uuid()}`;
const sharedTopicfilter : string = `$share/crttest/${testTopic}`;

const publisherConnected = once(publisher, mqtt5.Mqtt5Client.CONNECTION_SUCCESS);
const publisherStopped = once(publisher, mqtt5.Mqtt5Client.STOPPED);

const subscriber1Connected = once(subscriber1, mqtt5.Mqtt5Client.CONNECTION_SUCCESS);
const subscriber1Stopped = once(subscriber1, mqtt5.Mqtt5Client.STOPPED);

const subscriber2Connected = once(subscriber2, mqtt5.Mqtt5Client.CONNECTION_SUCCESS);
const subscriber2Stopped = once(subscriber2, mqtt5.Mqtt5Client.STOPPED);

publisher.start();
subscriber1.start();
subscriber2.start();

await publisherConnected;
await subscriber1Connected;
await subscriber2Connected;

await subscriber1.subscribe({
subscriptions: [
{topicFilter: sharedTopicfilter, qos: mqtt5.QoS.AtLeastOnce}
]
});
await subscriber2.subscribe({
subscriptions: [
{topicFilter: sharedTopicfilter, qos: mqtt5.QoS.AtLeastOnce}
]
});

let receivedResolve : (value?: void | PromiseLike<void>) => void;
const receivedPromise = new Promise<void>((resolve, reject) => {
receivedResolve = resolve;
setTimeout(() => reject(new Error("Did not receive expected number of messages")), 4000);
});

// map: subscriberId -> receivedCount
const subscriberMessages = new Map();

const getOnMessageReceived = (subscriberId : string) => {
subscriberMessages.set(subscriberId, 0);

return (eventData: mqtt5.MessageReceivedEvent) => {
const packet: mqtt5.PublishPacket = eventData.message;

subscriberMessages.set(subscriberId, subscriberMessages.get(subscriberId) + 1);

let messagesReceived : number = 0;
subscriberMessages.forEach(v => messagesReceived += v);
if (messagesReceived == messagesNumber) {
receivedResolve();
}

expect(packet.qos).toEqual(mqtt5.QoS.AtLeastOnce);
expect(packet.topicName).toEqual(testTopic);
};
};

subscriber1.on('messageReceived', getOnMessageReceived("sub1"));
subscriber2.on('messageReceived', getOnMessageReceived("sub2"));

for (let i = 0; i < messagesNumber; ++i) {
publisher.publish({
topicName: testTopic,
qos: mqtt5.QoS.AtLeastOnce,
payload: payload
});
}

// Wait for receiving all published messages.
await receivedPromise;

// Wait a little longer to check if extra messages arrive.
await new Promise(resolve => setTimeout(resolve, 1000));

let messagesReceived : number = 0;
subscriberMessages.forEach(v => {
messagesReceived += v;
// Each subscriber should receive a portion of messages.
expect(v).toBeGreaterThan(0);
});
expect(messagesReceived).toEqual(messagesNumber);

subscriber2.stop();
subscriber1.stop();
publisher.stop();

await subscriber2Stopped;
await subscriber1Stopped;
await publisherStopped;
}

0 comments on commit 05c3c6a

Please sign in to comment.