Skip to content

Commit 7a7d1e7

Browse files
committed
* test: adding channel, channelGroup, subscription, SubscriptionSet listenr tests
* fix: unique channel and groups for subscription, no telemetry for event engine, avoid duplicate listener registration
1 parent 06d61dc commit 7a7d1e7

File tree

6 files changed

+303
-14
lines changed

6 files changed

+303
-14
lines changed

src/core/components/eventEmitter.js

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -227,12 +227,20 @@ export default class EventEmitter {
227227
}
228228

229229
addListener(l, channels, groups) {
230-
channels.forEach((c) =>
231-
this._channelListenerMap[c] ? this._channelListenerMap[c].push(l) : (this._channelListenerMap[c] = [l]),
232-
);
233-
groups.forEach((g) =>
234-
this._groupListenerMap[g] ? this._groupListenerMap[g].push(l) : (this._groupListenerMap[g] = [l]),
235-
);
230+
channels.forEach((c) => {
231+
if (this._channelListenerMap[c]) {
232+
if (!this._channelListenerMap[c].includes(l)) this._channelListenerMap[c].push(l);
233+
} else {
234+
this._channelListenerMap[c] = [l];
235+
}
236+
});
237+
groups.forEach((g) => {
238+
if (this._groupListenerMap[g]) {
239+
if (!this._groupListenerMap[g].includes(l)) this._groupListenerMap[g].push(l);
240+
} else {
241+
this._groupListenerMap[g] = [l];
242+
}
243+
});
236244
}
237245

238246
removeListener(listener, channels, groups) {

src/core/components/telemetry_manager.js

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,23 @@ export default class {
3333
}
3434

3535
startLatencyMeasure(operationType, identifier) {
36-
if (operationType === operationConstants.PNSubscribeOperation || !identifier) {
36+
if (
37+
operationType === operationConstants.PNSubscribeOperation ||
38+
operationConstants.PNReceiveMessagesOperation ||
39+
!identifier
40+
) {
3741
return;
3842
}
3943

4044
this._trackedLatencies[identifier] = Date.now();
4145
}
4246

4347
stopLatencyMeasure(operationType, identifier) {
44-
if (operationType === operationConstants.PNSubscribeOperation || !identifier) {
48+
if (
49+
operationType === operationConstants.PNSubscribeOperation ||
50+
operationConstants.PNReceiveMessagesOperation ||
51+
!identifier
52+
) {
4553
return;
4654
}
4755

src/core/endpoints/subscriptionUtils/handshake.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,9 @@ const endpoint = {
1111
},
1212

1313
getURL: ({ config }, params) => {
14-
const channelsString = params.channels ? params.channels.join(',') : ',';
15-
return `/v2/subscribe/${config.subscribeKey}/${utils.encodeString(channelsString)}/0`;
14+
const { channels = [] } = params;
15+
const stringifiedChannels = channels.length > 0 ? channels.join(',') : ',';
16+
return `/v2/subscribe/${config.subscribeKey}/${utils.encodeString(stringifiedChannels)}/0`;
1617
},
1718

1819
getRequestTimeout: ({ config }) => config.getSubscribeTimeout(),

src/core/endpoints/subscriptionUtils/receiveMessages.js

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ const endpoint = {
1717
},
1818

1919
getURL: ({ config }, params) => {
20-
const channelsString = params.channels ? params.channels.join(',') : ',';
21-
return `/v2/subscribe/${config.subscribeKey}/${utils.encodeString(channelsString)}/0`;
20+
const { channels = [] } = params;
21+
const stringifiedChannels = channels.length > 0 ? channels.join(',') : ',';
22+
return `/v2/subscribe/${config.subscribeKey}/${utils.encodeString(stringifiedChannels)}/0`;
2223
},
2324

2425
getRequestTimeout: ({ config }) => config.getSubscribeTimeout(),

src/event-engine/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ export class EventEngine {
4343
timetoken?: string;
4444
withPresence?: boolean;
4545
}) {
46-
this.channels = [...this.channels, ...(channels ?? [])];
47-
this.groups = [...this.groups, ...(channelGroups ?? [])];
46+
this.channels = Array.from(new Set([...this.channels, ...(channels ?? [])]));
47+
this.groups = Array.from(new Set([...this.groups, ...(channelGroups ?? [])]));
4848
if (withPresence) {
4949
this.channels.map((c) => this.channels.push(`${c}-pnpres`));
5050
this.groups.map((g) => this.groups.push(`${g}-pnpres`));
Lines changed: 271 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,271 @@
1+
import nock from 'nock';
2+
import _ from 'underscore';
3+
4+
import utils from '../../utils';
5+
import PubNub from '../../../src/node/index';
6+
7+
let pubnub;
8+
9+
describe('#listeners', () => {
10+
before(() => {
11+
nock.disableNetConnect();
12+
});
13+
14+
after(() => {
15+
nock.enableNetConnect();
16+
});
17+
18+
beforeEach(() => {
19+
nock.cleanAll();
20+
pubnub = new PubNub({
21+
subscribeKey: 'mySubKey',
22+
publishKey: 'myPublishKey',
23+
uuid: 'myUUID',
24+
enableEventEngine: true,
25+
autoNetworkDetection: false,
26+
});
27+
});
28+
29+
afterEach(() => {
30+
pubnub.stop();
31+
});
32+
33+
it('should pass messages of subscribed channel to its listener', async () => {
34+
utils
35+
.createNock()
36+
.get('/v2/subscribe/mySubKey/ch1/0')
37+
.query({
38+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
39+
uuid: 'myUUID',
40+
ee: '',
41+
state: '{}',
42+
tt: 0,
43+
})
44+
.reply(200, '{"t":{"t":"3","r":1},"m":[]}');
45+
utils
46+
.createNock()
47+
.get('/v2/subscribe/mySubKey/ch1/0')
48+
.query({
49+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
50+
uuid: 'myUUID',
51+
ee: '',
52+
tt: '3',
53+
tr: 1,
54+
})
55+
.reply(
56+
200,
57+
'{"t":{"t":"10","r":1},"m":[{"a":"3","f":514,"i":"demo","p":{"t":"17069673079697201","r":33},"k":"demo","c":"ch1","d":{"message":"My message!"}}]}',
58+
);
59+
var channel = pubnub.channel('ch1');
60+
var subscription = channel.subscription();
61+
var messagePromise = new Promise((resolveMessage) =>
62+
subscription.addListener({
63+
message: (m) => resolveMessage(m),
64+
}),
65+
);
66+
subscription.subscribe();
67+
const actual = await messagePromise;
68+
expect(JSON.stringify(actual.message)).to.equal('{"message":"My message!"}');
69+
});
70+
it('should subscribed to channel and presence channels', async () => {
71+
utils
72+
.createNock()
73+
.get('/v2/subscribe/mySubKey/ch1%2Cch1-pnpres/0')
74+
.query({
75+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
76+
uuid: 'myUUID',
77+
ee: '',
78+
state: '{}',
79+
tt: 0,
80+
})
81+
.reply(200, '{"t":{"t":"3","r":1},"m":[]}');
82+
utils
83+
.createNock()
84+
.get('/v2/subscribe/mySubKey/ch1%2Cch1-pnpres/0')
85+
.query({
86+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
87+
uuid: 'myUUID',
88+
ee: '',
89+
tt: '3',
90+
tr: 1,
91+
})
92+
.reply(
93+
200,
94+
'{"t":{"t":"10","r":1},"m":[{"a":"3","f":514,"i":"demo","p":{"t":"17069673079697201","r":33},"k":"demo","c":"ch1","d":{"message":"My message!"}}]}',
95+
);
96+
97+
const channel = pubnub.channel('ch1');
98+
const subscription = channel.subscription({ receivePresenceEvents: true });
99+
const messagePromise = new Promise((resolveMessage) =>
100+
subscription.addListener({
101+
message: (m) => resolveMessage(m),
102+
}),
103+
);
104+
subscription.subscribe();
105+
const actual = await messagePromise;
106+
expect(JSON.stringify(actual.message)).to.equal('{"message":"My message!"}');
107+
});
108+
109+
it('should work with subscriptionSet', async () => {
110+
utils
111+
.createNock()
112+
.get('/v2/subscribe/mySubKey/ch1%2Cch2/0')
113+
.query({
114+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
115+
uuid: 'myUUID',
116+
ee: '',
117+
state: '{}',
118+
tt: 0,
119+
})
120+
.reply(200, '{"t":{"t":"3","r":1},"m":[]}');
121+
utils
122+
.createNock()
123+
.get('/v2/subscribe/mySubKey/ch1%2Cch2/0')
124+
.query({
125+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
126+
uuid: 'myUUID',
127+
ee: '',
128+
tt: '3',
129+
tr: 1,
130+
})
131+
.reply(
132+
200,
133+
'{"t":{"t":"10","r":1},"m":[{"a":"3","f":514,"i":"demo","p":{"t":"17069673079697201","r":33},"k":"demo","c":"ch1","d":{"message":"My message!"}}]}',
134+
);
135+
136+
const channel = pubnub.channel('ch1');
137+
const subscription = channel.subscription();
138+
const subscriptionSet = subscription.addSubscription(pubnub.channel('ch2').subscription());
139+
const messagePromise = new Promise((resolveMessage) =>
140+
subscriptionSet.addListener({
141+
message: (m) => resolveMessage(m),
142+
}),
143+
);
144+
subscriptionSet.subscribe();
145+
const actual = await messagePromise;
146+
expect(JSON.stringify(actual.message)).to.equal('{"message":"My message!"}');
147+
});
148+
149+
it('listener should route presence event to registered handler', async () => {
150+
utils
151+
.createNock()
152+
.get('/v2/subscribe/mySubKey/ch1%2Cch1-pnpres/0')
153+
.query({
154+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
155+
uuid: 'myUUID',
156+
ee: '',
157+
state: '{}',
158+
tt: 0,
159+
})
160+
.reply(200, '{"t":{"t":"3","r":1},"m":[]}');
161+
utils
162+
.createNock()
163+
.get('/v2/subscribe/mySubKey/ch1%2Cch1-pnpres/0')
164+
.query({
165+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
166+
uuid: 'myUUID',
167+
ee: '',
168+
tt: '3',
169+
tr: 1,
170+
})
171+
.reply(
172+
200,
173+
'{"t":{"t":"17070458535164862","r":31},"m":[{"a":"0","f":0,"p":{"t":"17070458535164862","r":31},"k":"mySubKey","c":"ch1-pnpres","u":{"pn_action":"join","pn_uuid":"dartClient","pn_timestamp":1707045853,"pn_precise_timestamp":1707045853513,"pn_occupancy":2,"pn_ispresence":1,"pn_channel":"ch1"},"d":{"action":"join","uuid":"p2","timestamp":1707045853,"precise_timestamp":1707045853513,"occupancy":2},"b":"ch1-pnpres"}]}',
174+
);
175+
176+
const channel = pubnub.channel('ch1');
177+
const subscription = channel.subscription({ receivePresenceEvents: true });
178+
const presencePromise = new Promise((resolvePresence) =>
179+
subscription.addListener({
180+
presence: (p) => resolvePresence(p),
181+
}),
182+
);
183+
subscription.subscribe();
184+
const actual = await presencePromise;
185+
expect(actual.action).to.equal('join');
186+
expect(actual.occupancy).to.equal(2);
187+
});
188+
189+
it('add/remove listener should work on subscription', async () => {
190+
utils
191+
.createNock()
192+
.get('/v2/subscribe/mySubKey/ch1/0')
193+
.query({
194+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
195+
uuid: 'myUUID',
196+
ee: '',
197+
state: '{}',
198+
tt: 0,
199+
})
200+
.reply(200, '{"t":{"t":"3","r":1},"m":[]}');
201+
utils
202+
.createNock()
203+
.get('/v2/subscribe/mySubKey/ch1/0')
204+
.query({
205+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
206+
uuid: 'myUUID',
207+
ee: '',
208+
tt: '3',
209+
tr: 1,
210+
})
211+
.reply(
212+
200,
213+
'{"t":{"t":"10","r":1},"m":[{"a":"3","f":514,"i":"demo","p":{"t":"17069673079697201","r":33},"k":"demo","c":"ch1","d":{"message":"My message!"}}]}',
214+
);
215+
const messages = [];
216+
const channel = pubnub.channel('ch1');
217+
const subscription = channel.subscription();
218+
const listener = { message: (m) => messages.push(m) };
219+
subscription.addListener(listener);
220+
const messagePromise = new Promise((resolveMessage) =>
221+
subscription.addListener({
222+
message: (m) => resolveMessage(m),
223+
}),
224+
);
225+
subscription.removeListener(listener);
226+
subscription.subscribe();
227+
const actual = await messagePromise;
228+
expect(JSON.stringify(actual.message)).to.equal('{"message":"My message!"}');
229+
expect(messages.length).to.equal(0);
230+
});
231+
232+
it('should work with channel groups and their presence', async () => {
233+
utils
234+
.createNock()
235+
.get('/v2/subscribe/mySubKey/%2C/0')
236+
.query({
237+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
238+
uuid: 'myUUID',
239+
ee: '',
240+
state: '{}',
241+
tt: 0,
242+
'channel-group': 'cg1,cg1-pnpres',
243+
})
244+
.reply(200, '{"t":{"t":"3","r":1},"m":[]}');
245+
utils
246+
.createNock()
247+
.get('/v2/subscribe/mySubKey/%2C/0')
248+
.query({
249+
pnsdk: `PubNub-JS-Nodejs/${pubnub.getVersion()}`,
250+
uuid: 'myUUID',
251+
ee: '',
252+
tt: '3',
253+
tr: 1,
254+
'channel-group': 'cg1,cg1-pnpres',
255+
})
256+
.reply(
257+
200,
258+
'{"t":{"t":"17070655215847224","r":33},"m":[{"a":"0","f":0,"i":"cl1","p":{"t":"17070655215847224","r":31},"k":"mySubKey","c":"ch1","d":{"message":"My message!"},"b":"cg1"}]}',
259+
);
260+
var channelGroup = pubnub.channelGroup('cg1');
261+
var subscription = channelGroup.subscription({ receivePresenceEvents: true });
262+
var messagePromise = new Promise((resolveMessage) =>
263+
subscription.addListener({
264+
message: (m) => resolveMessage(m),
265+
}),
266+
);
267+
subscription.subscribe();
268+
const actual = await messagePromise;
269+
expect(JSON.stringify(actual.message)).to.equal('{"message":"My message!"}');
270+
});
271+
});

0 commit comments

Comments
 (0)