Skip to content

Commit

Permalink
Extract RealtimeChannel MessageFilter-related code to new class
Browse files Browse the repository at this point in the history
In preparation for #1397 (making MessageFilter functionality
tree-shakable).
  • Loading branch information
lawrence-forooghian committed Oct 30, 2023
1 parent 768eb0c commit 948f255
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 103 deletions.
112 changes: 112 additions & 0 deletions src/common/lib/client/filteredsubscriptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import * as API from '../../../../ably';
import RealtimeChannel from './realtimechannel';
import Message from '../types/message';

export class FilteredSubscriptions {
static subscribeFilter(
channel: RealtimeChannel,
filter: API.Types.MessageFilter,
listener: API.Types.messageCallback<Message>
) {
const filteredListener = (m: Message) => {
const mapping: { [key in keyof API.Types.MessageFilter]: any } = {
name: m.name,
refTimeserial: m.extras?.ref?.timeserial,
refType: m.extras?.ref?.type,
isRef: !!m.extras?.ref?.timeserial,
clientId: m.clientId,
};
// Check if any values are defined in the filter and if they match the value in the message object
if (
Object.entries(filter).find(([key, value]) =>
value !== undefined ? mapping[key as keyof API.Types.MessageFilter] !== value : false
)
) {
return;
}
listener(m);
};
this.addFilteredSubscription(channel, filter, listener, filteredListener);
channel.subscriptions.on(filteredListener);
}

// Adds a new filtered subscription
static addFilteredSubscription(
channel: RealtimeChannel,
filter: API.Types.MessageFilter,
realListener: API.Types.messageCallback<Message>,
filteredListener: API.Types.messageCallback<Message>
) {
if (!channel.filteredSubscriptions) {
channel.filteredSubscriptions = new Map<
API.Types.messageCallback<Message>,
Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>
>();
}
if (channel.filteredSubscriptions.has(realListener)) {
const realListenerMap = channel.filteredSubscriptions.get(realListener) as Map<
API.Types.MessageFilter,
API.Types.messageCallback<Message>[]
>;
// Add the filtered listener to the map, or append to the array if this filter has already been used
realListenerMap.set(filter, realListenerMap?.get(filter)?.concat(filteredListener) || [filteredListener]);
} else {
channel.filteredSubscriptions.set(
realListener,
new Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>([[filter, [filteredListener]]])
);
}
}

static getAndDeleteFilteredSubscriptions(
channel: RealtimeChannel,
filter: API.Types.MessageFilter | undefined,
realListener: API.Types.messageCallback<Message> | undefined
): API.Types.messageCallback<Message>[] {
// No filtered subscriptions map means there has been no filtered subscriptions yet, so return nothing
if (!channel.filteredSubscriptions) {
return [];
}
// Only a filter is passed in with no specific listener
if (!realListener && filter) {
// Return each listener which is attached to the specified filter object
return Array.from(channel.filteredSubscriptions.entries())
.map(([key, filterMaps]) => {
// Get (then delete) the maps matching this filter
let listenerMaps = filterMaps.get(filter);
filterMaps.delete(filter);
// Clear the parent if nothing is left
if (filterMaps.size === 0) {
channel.filteredSubscriptions?.delete(key);
}
return listenerMaps;
})
.reduce(
(prev, cur) => (cur ? (prev as API.Types.messageCallback<Message>[]).concat(...cur) : prev),
[]
) as API.Types.messageCallback<Message>[];
}

// No subscriptions for this listener
if (!realListener || !channel.filteredSubscriptions.has(realListener)) {
return [];
}
const realListenerMap = channel.filteredSubscriptions.get(realListener) as Map<
API.Types.MessageFilter,
API.Types.messageCallback<Message>[]
>;
// If no filter is specified return all listeners using that function
if (!filter) {
// array.flat is not available unless we support es2019 or higher
const listeners = Array.from(realListenerMap.values()).reduce((prev, cur) => prev.concat(...cur), []);
// remove the listener from the map
channel.filteredSubscriptions.delete(realListener);
return listeners;
}

let listeners = realListenerMap.get(filter);
realListenerMap.delete(filter);

return listeners || [];
}
}
108 changes: 5 additions & 103 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import ConnectionManager from '../transport/connectionmanager';
import ConnectionStateChange from './connectionstatechange';
import { ErrCallback, PaginatedResultCallback } from '../../types/utils';
import BaseRealtime from './baserealtime';
import { FilteredSubscriptions } from './filteredsubscriptions';

interface RealtimeHistoryParams {
start?: number;
Expand Down Expand Up @@ -414,121 +415,22 @@ class RealtimeChannel extends Channel {

// Filtered
if (event && typeof event === 'object' && !Array.isArray(event)) {
this._subscribeFilter(event, listener);
FilteredSubscriptions.subscribeFilter(this, event, listener);
} else {
this.subscriptions.on(event, listener);
}

return this.attach(callback || noop);
}

_subscribeFilter(filter: API.Types.MessageFilter, listener: API.Types.messageCallback<Message>) {
const filteredListener = (m: Message) => {
const mapping: { [key in keyof API.Types.MessageFilter]: any } = {
name: m.name,
refTimeserial: m.extras?.ref?.timeserial,
refType: m.extras?.ref?.type,
isRef: !!m.extras?.ref?.timeserial,
clientId: m.clientId,
};
// Check if any values are defined in the filter and if they match the value in the message object
if (
Object.entries(filter).find(([key, value]) =>
value !== undefined ? mapping[key as keyof API.Types.MessageFilter] !== value : false
)
) {
return;
}
listener(m);
};
this._addFilteredSubscription(filter, listener, filteredListener);
this.subscriptions.on(filteredListener);
}

// Adds a new filtered subscription
_addFilteredSubscription(
filter: API.Types.MessageFilter,
realListener: API.Types.messageCallback<Message>,
filteredListener: API.Types.messageCallback<Message>
) {
if (!this.filteredSubscriptions) {
this.filteredSubscriptions = new Map<
API.Types.messageCallback<Message>,
Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>
>();
}
if (this.filteredSubscriptions.has(realListener)) {
const realListenerMap = this.filteredSubscriptions.get(realListener) as Map<
API.Types.MessageFilter,
API.Types.messageCallback<Message>[]
>;
// Add the filtered listener to the map, or append to the array if this filter has already been used
realListenerMap.set(filter, realListenerMap?.get(filter)?.concat(filteredListener) || [filteredListener]);
} else {
this.filteredSubscriptions.set(
realListener,
new Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>([[filter, [filteredListener]]])
);
}
}

_getAndDeleteFilteredSubscriptions(
filter: API.Types.MessageFilter | undefined,
realListener: API.Types.messageCallback<Message> | undefined
): API.Types.messageCallback<Message>[] {
// No filtered subscriptions map means there has been no filtered subscriptions yet, so return nothing
if (!this.filteredSubscriptions) {
return [];
}
// Only a filter is passed in with no specific listener
if (!realListener && filter) {
// Return each listener which is attached to the specified filter object
return Array.from(this.filteredSubscriptions.entries())
.map(([key, filterMaps]) => {
// Get (then delete) the maps matching this filter
let listenerMaps = filterMaps.get(filter);
filterMaps.delete(filter);
// Clear the parent if nothing is left
if (filterMaps.size === 0) {
this.filteredSubscriptions?.delete(key);
}
return listenerMaps;
})
.reduce(
(prev, cur) => (cur ? (prev as API.Types.messageCallback<Message>[]).concat(...cur) : prev),
[]
) as API.Types.messageCallback<Message>[];
}

// No subscriptions for this listener
if (!realListener || !this.filteredSubscriptions.has(realListener)) {
return [];
}
const realListenerMap = this.filteredSubscriptions.get(realListener) as Map<
API.Types.MessageFilter,
API.Types.messageCallback<Message>[]
>;
// If no filter is specified return all listeners using that function
if (!filter) {
// array.flat is not available unless we support es2019 or higher
const listeners = Array.from(realListenerMap.values()).reduce((prev, cur) => prev.concat(...cur), []);
// remove the listener from the map
this.filteredSubscriptions.delete(realListener);
return listeners;
}

let listeners = realListenerMap.get(filter);
realListenerMap.delete(filter);

return listeners || [];
}

unsubscribe(...args: unknown[] /* [event], listener */): void {
const [event, listener] = RealtimeChannel.processListenerArgs(args);

// If we either have a filtered listener, a filter or both we need to do additional processing to find the original function(s)
if ((typeof event === 'object' && !listener) || this.filteredSubscriptions?.has(listener)) {
this._getAndDeleteFilteredSubscriptions(event, listener).forEach((l) => this.subscriptions.off(l));
FilteredSubscriptions.getAndDeleteFilteredSubscriptions(this, event, listener).forEach((l) =>
this.subscriptions.off(l)
);
return;
}

Expand Down

0 comments on commit 948f255

Please sign in to comment.