Skip to content

Commit 7fe853c

Browse files
authored
Merge branch 'main' into external_buffer_fix
2 parents cfedd33 + 3f57c09 commit 7fe853c

File tree

2 files changed

+104
-0
lines changed

2 files changed

+104
-0
lines changed

lib/native/mqtt5.spec.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -582,6 +582,14 @@ test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvir
582582
expect(willReceived).toEqual(true);
583583
});
584584

585+
test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Shared subscriptions test', async () => {
586+
const config : mqtt5.Mqtt5ClientConfig = createDirectIotCoreClientConfig();
587+
const publisher : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
588+
const subscriber1 : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
589+
const subscriber2 : mqtt5.Mqtt5Client = new mqtt5.Mqtt5Client(config);
590+
await test_utils.doSharedSubscriptionsTest(publisher, subscriber1, subscriber2);
591+
});
592+
585593
test_utils.conditional_test(test_utils.ClientEnvironmentalConfig.hasIotCoreEnvironment())('Operation failure - null subscribe', async () => {
586594
await test_utils.nullSubscribeTest(new mqtt5.Mqtt5Client(createDirectIotCoreClientConfig()));
587595
});

test/mqtt5.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -576,3 +576,99 @@ export async function doRetainTest(client1: mqtt5.Mqtt5Client, client2: mqtt5.Mq
576576
await stopped1;
577577
client1.close();
578578
}
579+
580+
export async function doSharedSubscriptionsTest(publisher: mqtt5.Mqtt5Client, subscriber1: mqtt5.Mqtt5Client, subscriber2: mqtt5.Mqtt5Client) {
581+
const payload : Buffer = Buffer.from("share", "utf-8");
582+
const messagesNumber: number = 10;
583+
const testTopic: string = `mqtt5_test${uuid()}`;
584+
const sharedTopicfilter : string = `$share/crttest/${testTopic}`;
585+
586+
const publisherConnected = once(publisher, mqtt5.Mqtt5Client.CONNECTION_SUCCESS);
587+
const publisherStopped = once(publisher, mqtt5.Mqtt5Client.STOPPED);
588+
589+
const subscriber1Connected = once(subscriber1, mqtt5.Mqtt5Client.CONNECTION_SUCCESS);
590+
const subscriber1Stopped = once(subscriber1, mqtt5.Mqtt5Client.STOPPED);
591+
592+
const subscriber2Connected = once(subscriber2, mqtt5.Mqtt5Client.CONNECTION_SUCCESS);
593+
const subscriber2Stopped = once(subscriber2, mqtt5.Mqtt5Client.STOPPED);
594+
595+
publisher.start();
596+
subscriber1.start();
597+
subscriber2.start();
598+
599+
await publisherConnected;
600+
await subscriber1Connected;
601+
await subscriber2Connected;
602+
603+
await subscriber1.subscribe({
604+
subscriptions: [
605+
{topicFilter: sharedTopicfilter, qos: mqtt5.QoS.AtLeastOnce}
606+
]
607+
});
608+
await subscriber2.subscribe({
609+
subscriptions: [
610+
{topicFilter: sharedTopicfilter, qos: mqtt5.QoS.AtLeastOnce}
611+
]
612+
});
613+
614+
let receivedResolve : (value?: void | PromiseLike<void>) => void;
615+
const receivedPromise = new Promise<void>((resolve, reject) => {
616+
receivedResolve = resolve;
617+
setTimeout(() => reject(new Error("Did not receive expected number of messages")), 4000);
618+
});
619+
620+
// map: subscriberId -> receivedCount
621+
const subscriberMessages = new Map();
622+
623+
const getOnMessageReceived = (subscriberId : string) => {
624+
subscriberMessages.set(subscriberId, 0);
625+
626+
return (eventData: mqtt5.MessageReceivedEvent) => {
627+
const packet: mqtt5.PublishPacket = eventData.message;
628+
629+
subscriberMessages.set(subscriberId, subscriberMessages.get(subscriberId) + 1);
630+
631+
let messagesReceived : number = 0;
632+
subscriberMessages.forEach(v => messagesReceived += v);
633+
if (messagesReceived == messagesNumber) {
634+
receivedResolve();
635+
}
636+
637+
expect(packet.qos).toEqual(mqtt5.QoS.AtLeastOnce);
638+
expect(packet.topicName).toEqual(testTopic);
639+
};
640+
};
641+
642+
subscriber1.on('messageReceived', getOnMessageReceived("sub1"));
643+
subscriber2.on('messageReceived', getOnMessageReceived("sub2"));
644+
645+
for (let i = 0; i < messagesNumber; ++i) {
646+
publisher.publish({
647+
topicName: testTopic,
648+
qos: mqtt5.QoS.AtLeastOnce,
649+
payload: payload
650+
});
651+
}
652+
653+
// Wait for receiving all published messages.
654+
await receivedPromise;
655+
656+
// Wait a little longer to check if extra messages arrive.
657+
await new Promise(resolve => setTimeout(resolve, 1000));
658+
659+
let messagesReceived : number = 0;
660+
subscriberMessages.forEach(v => {
661+
messagesReceived += v;
662+
// Each subscriber should receive a portion of messages.
663+
expect(v).toBeGreaterThan(0);
664+
});
665+
expect(messagesReceived).toEqual(messagesNumber);
666+
667+
subscriber2.stop();
668+
subscriber1.stop();
669+
publisher.stop();
670+
671+
await subscriber2Stopped;
672+
await subscriber1Stopped;
673+
await publisherStopped;
674+
}

0 commit comments

Comments
 (0)