Skip to content

Commit 61cf994

Browse files
Extract RealtimeChannel MessageFilter-related code to new class
In preparation for #1397 (making MessageFilter functionality tree-shakable).
1 parent 674e88a commit 61cf994

File tree

2 files changed

+117
-103
lines changed

2 files changed

+117
-103
lines changed
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import * as API from '../../../../ably';
2+
import RealtimeChannel from './realtimechannel';
3+
import Message from '../types/message';
4+
5+
export class FilteredSubscriptions {
6+
static subscribeFilter(
7+
channel: RealtimeChannel,
8+
filter: API.Types.MessageFilter,
9+
listener: API.Types.messageCallback<Message>
10+
) {
11+
const filteredListener = (m: Message) => {
12+
const mapping: { [key in keyof API.Types.MessageFilter]: any } = {
13+
name: m.name,
14+
refTimeserial: m.extras?.ref?.timeserial,
15+
refType: m.extras?.ref?.type,
16+
isRef: !!m.extras?.ref?.timeserial,
17+
clientId: m.clientId,
18+
};
19+
// Check if any values are defined in the filter and if they match the value in the message object
20+
if (
21+
Object.entries(filter).find(([key, value]) =>
22+
value !== undefined ? mapping[key as keyof API.Types.MessageFilter] !== value : false
23+
)
24+
) {
25+
return;
26+
}
27+
listener(m);
28+
};
29+
this.addFilteredSubscription(channel, filter, listener, filteredListener);
30+
channel.subscriptions.on(filteredListener);
31+
}
32+
33+
// Adds a new filtered subscription
34+
static addFilteredSubscription(
35+
channel: RealtimeChannel,
36+
filter: API.Types.MessageFilter,
37+
realListener: API.Types.messageCallback<Message>,
38+
filteredListener: API.Types.messageCallback<Message>
39+
) {
40+
if (!channel.filteredSubscriptions) {
41+
channel.filteredSubscriptions = new Map<
42+
API.Types.messageCallback<Message>,
43+
Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>
44+
>();
45+
}
46+
if (channel.filteredSubscriptions.has(realListener)) {
47+
const realListenerMap = channel.filteredSubscriptions.get(realListener) as Map<
48+
API.Types.MessageFilter,
49+
API.Types.messageCallback<Message>[]
50+
>;
51+
// Add the filtered listener to the map, or append to the array if this filter has already been used
52+
realListenerMap.set(filter, realListenerMap?.get(filter)?.concat(filteredListener) || [filteredListener]);
53+
} else {
54+
channel.filteredSubscriptions.set(
55+
realListener,
56+
new Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>([[filter, [filteredListener]]])
57+
);
58+
}
59+
}
60+
61+
static getAndDeleteFilteredSubscriptions(
62+
channel: RealtimeChannel,
63+
filter: API.Types.MessageFilter | undefined,
64+
realListener: API.Types.messageCallback<Message> | undefined
65+
): API.Types.messageCallback<Message>[] {
66+
// No filtered subscriptions map means there has been no filtered subscriptions yet, so return nothing
67+
if (!channel.filteredSubscriptions) {
68+
return [];
69+
}
70+
// Only a filter is passed in with no specific listener
71+
if (!realListener && filter) {
72+
// Return each listener which is attached to the specified filter object
73+
return Array.from(channel.filteredSubscriptions.entries())
74+
.map(([key, filterMaps]) => {
75+
// Get (then delete) the maps matching this filter
76+
let listenerMaps = filterMaps.get(filter);
77+
filterMaps.delete(filter);
78+
// Clear the parent if nothing is left
79+
if (filterMaps.size === 0) {
80+
channel.filteredSubscriptions?.delete(key);
81+
}
82+
return listenerMaps;
83+
})
84+
.reduce(
85+
(prev, cur) => (cur ? (prev as API.Types.messageCallback<Message>[]).concat(...cur) : prev),
86+
[]
87+
) as API.Types.messageCallback<Message>[];
88+
}
89+
90+
// No subscriptions for this listener
91+
if (!realListener || !channel.filteredSubscriptions.has(realListener)) {
92+
return [];
93+
}
94+
const realListenerMap = channel.filteredSubscriptions.get(realListener) as Map<
95+
API.Types.MessageFilter,
96+
API.Types.messageCallback<Message>[]
97+
>;
98+
// If no filter is specified return all listeners using that function
99+
if (!filter) {
100+
// array.flat is not available unless we support es2019 or higher
101+
const listeners = Array.from(realListenerMap.values()).reduce((prev, cur) => prev.concat(...cur), []);
102+
// remove the listener from the map
103+
channel.filteredSubscriptions.delete(realListener);
104+
return listeners;
105+
}
106+
107+
let listeners = realListenerMap.get(filter);
108+
realListenerMap.delete(filter);
109+
110+
return listeners || [];
111+
}
112+
}

src/common/lib/client/realtimechannel.ts

Lines changed: 5 additions & 103 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import ConnectionManager from '../transport/connectionmanager';
1414
import ConnectionStateChange from './connectionstatechange';
1515
import { ErrCallback, PaginatedResultCallback, StandardCallback } from '../../types/utils';
1616
import BaseRealtime from './baserealtime';
17+
import { FilteredSubscriptions } from './filteredsubscriptions';
1718

1819
interface RealtimeHistoryParams {
1920
start?: number;
@@ -438,121 +439,22 @@ class RealtimeChannel extends Channel {
438439

439440
// Filtered
440441
if (event && typeof event === 'object' && !Array.isArray(event)) {
441-
this._subscribeFilter(event, listener);
442+
FilteredSubscriptions.subscribeFilter(this, event, listener);
442443
} else {
443444
this.subscriptions.on(event, listener);
444445
}
445446

446447
return this.attach(callback || noop);
447448
}
448449

449-
_subscribeFilter(filter: API.Types.MessageFilter, listener: API.Types.messageCallback<Message>) {
450-
const filteredListener = (m: Message) => {
451-
const mapping: { [key in keyof API.Types.MessageFilter]: any } = {
452-
name: m.name,
453-
refTimeserial: m.extras?.ref?.timeserial,
454-
refType: m.extras?.ref?.type,
455-
isRef: !!m.extras?.ref?.timeserial,
456-
clientId: m.clientId,
457-
};
458-
// Check if any values are defined in the filter and if they match the value in the message object
459-
if (
460-
Object.entries(filter).find(([key, value]) =>
461-
value !== undefined ? mapping[key as keyof API.Types.MessageFilter] !== value : false
462-
)
463-
) {
464-
return;
465-
}
466-
listener(m);
467-
};
468-
this._addFilteredSubscription(filter, listener, filteredListener);
469-
this.subscriptions.on(filteredListener);
470-
}
471-
472-
// Adds a new filtered subscription
473-
_addFilteredSubscription(
474-
filter: API.Types.MessageFilter,
475-
realListener: API.Types.messageCallback<Message>,
476-
filteredListener: API.Types.messageCallback<Message>
477-
) {
478-
if (!this.filteredSubscriptions) {
479-
this.filteredSubscriptions = new Map<
480-
API.Types.messageCallback<Message>,
481-
Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>
482-
>();
483-
}
484-
if (this.filteredSubscriptions.has(realListener)) {
485-
const realListenerMap = this.filteredSubscriptions.get(realListener) as Map<
486-
API.Types.MessageFilter,
487-
API.Types.messageCallback<Message>[]
488-
>;
489-
// Add the filtered listener to the map, or append to the array if this filter has already been used
490-
realListenerMap.set(filter, realListenerMap?.get(filter)?.concat(filteredListener) || [filteredListener]);
491-
} else {
492-
this.filteredSubscriptions.set(
493-
realListener,
494-
new Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>([[filter, [filteredListener]]])
495-
);
496-
}
497-
}
498-
499-
_getAndDeleteFilteredSubscriptions(
500-
filter: API.Types.MessageFilter | undefined,
501-
realListener: API.Types.messageCallback<Message> | undefined
502-
): API.Types.messageCallback<Message>[] {
503-
// No filtered subscriptions map means there has been no filtered subscriptions yet, so return nothing
504-
if (!this.filteredSubscriptions) {
505-
return [];
506-
}
507-
// Only a filter is passed in with no specific listener
508-
if (!realListener && filter) {
509-
// Return each listener which is attached to the specified filter object
510-
return Array.from(this.filteredSubscriptions.entries())
511-
.map(([key, filterMaps]) => {
512-
// Get (then delete) the maps matching this filter
513-
let listenerMaps = filterMaps.get(filter);
514-
filterMaps.delete(filter);
515-
// Clear the parent if nothing is left
516-
if (filterMaps.size === 0) {
517-
this.filteredSubscriptions?.delete(key);
518-
}
519-
return listenerMaps;
520-
})
521-
.reduce(
522-
(prev, cur) => (cur ? (prev as API.Types.messageCallback<Message>[]).concat(...cur) : prev),
523-
[]
524-
) as API.Types.messageCallback<Message>[];
525-
}
526-
527-
// No subscriptions for this listener
528-
if (!realListener || !this.filteredSubscriptions.has(realListener)) {
529-
return [];
530-
}
531-
const realListenerMap = this.filteredSubscriptions.get(realListener) as Map<
532-
API.Types.MessageFilter,
533-
API.Types.messageCallback<Message>[]
534-
>;
535-
// If no filter is specified return all listeners using that function
536-
if (!filter) {
537-
// array.flat is not available unless we support es2019 or higher
538-
const listeners = Array.from(realListenerMap.values()).reduce((prev, cur) => prev.concat(...cur), []);
539-
// remove the listener from the map
540-
this.filteredSubscriptions.delete(realListener);
541-
return listeners;
542-
}
543-
544-
let listeners = realListenerMap.get(filter);
545-
realListenerMap.delete(filter);
546-
547-
return listeners || [];
548-
}
549-
550450
unsubscribe(...args: unknown[] /* [event], listener */): void {
551451
const [event, listener] = RealtimeChannel.processListenerArgs(args);
552452

553453
// If we either have a filtered listener, a filter or both we need to do additional processing to find the original function(s)
554454
if ((typeof event === 'object' && !listener) || this.filteredSubscriptions?.has(listener)) {
555-
this._getAndDeleteFilteredSubscriptions(event, listener).forEach((l) => this.subscriptions.off(l));
455+
FilteredSubscriptions.getAndDeleteFilteredSubscriptions(this, event, listener).forEach((l) =>
456+
this.subscriptions.off(l)
457+
);
556458
return;
557459
}
558460

0 commit comments

Comments
 (0)