From 948f2554687ce8c431a0f2629ac017b0d8846328 Mon Sep 17 00:00:00 2001 From: Lawrence Forooghian Date: Fri, 27 Oct 2023 15:11:04 -0300 Subject: [PATCH] Extract RealtimeChannel MessageFilter-related code to new class In preparation for #1397 (making MessageFilter functionality tree-shakable). --- .../lib/client/filteredsubscriptions.ts | 112 ++++++++++++++++++ src/common/lib/client/realtimechannel.ts | 108 +---------------- 2 files changed, 117 insertions(+), 103 deletions(-) create mode 100644 src/common/lib/client/filteredsubscriptions.ts diff --git a/src/common/lib/client/filteredsubscriptions.ts b/src/common/lib/client/filteredsubscriptions.ts new file mode 100644 index 0000000000..1588fa4962 --- /dev/null +++ b/src/common/lib/client/filteredsubscriptions.ts @@ -0,0 +1,112 @@ +import * as API from '../../../../ably'; +import RealtimeChannel from './realtimechannel'; +import Message from '../types/message'; + +export class FilteredSubscriptions { + static subscribeFilter( + channel: RealtimeChannel, + filter: API.Types.MessageFilter, + listener: API.Types.messageCallback + ) { + const filteredListener = (m: Message) => { + const mapping: { [key in keyof API.Types.MessageFilter]: any } = { + name: m.name, + refTimeserial: m.extras?.ref?.timeserial, + refType: m.extras?.ref?.type, + isRef: !!m.extras?.ref?.timeserial, + clientId: m.clientId, + }; + // Check if any values are defined in the filter and if they match the value in the message object + if ( + Object.entries(filter).find(([key, value]) => + value !== undefined ? mapping[key as keyof API.Types.MessageFilter] !== value : false + ) + ) { + return; + } + listener(m); + }; + this.addFilteredSubscription(channel, filter, listener, filteredListener); + channel.subscriptions.on(filteredListener); + } + + // Adds a new filtered subscription + static addFilteredSubscription( + channel: RealtimeChannel, + filter: API.Types.MessageFilter, + realListener: API.Types.messageCallback, + filteredListener: API.Types.messageCallback + ) { + if (!channel.filteredSubscriptions) { + channel.filteredSubscriptions = new Map< + API.Types.messageCallback, + Map[]> + >(); + } + if (channel.filteredSubscriptions.has(realListener)) { + const realListenerMap = channel.filteredSubscriptions.get(realListener) as Map< + API.Types.MessageFilter, + API.Types.messageCallback[] + >; + // Add the filtered listener to the map, or append to the array if this filter has already been used + realListenerMap.set(filter, realListenerMap?.get(filter)?.concat(filteredListener) || [filteredListener]); + } else { + channel.filteredSubscriptions.set( + realListener, + new Map[]>([[filter, [filteredListener]]]) + ); + } + } + + static getAndDeleteFilteredSubscriptions( + channel: RealtimeChannel, + filter: API.Types.MessageFilter | undefined, + realListener: API.Types.messageCallback | undefined + ): API.Types.messageCallback[] { + // No filtered subscriptions map means there has been no filtered subscriptions yet, so return nothing + if (!channel.filteredSubscriptions) { + return []; + } + // Only a filter is passed in with no specific listener + if (!realListener && filter) { + // Return each listener which is attached to the specified filter object + return Array.from(channel.filteredSubscriptions.entries()) + .map(([key, filterMaps]) => { + // Get (then delete) the maps matching this filter + let listenerMaps = filterMaps.get(filter); + filterMaps.delete(filter); + // Clear the parent if nothing is left + if (filterMaps.size === 0) { + channel.filteredSubscriptions?.delete(key); + } + return listenerMaps; + }) + .reduce( + (prev, cur) => (cur ? (prev as API.Types.messageCallback[]).concat(...cur) : prev), + [] + ) as API.Types.messageCallback[]; + } + + // No subscriptions for this listener + if (!realListener || !channel.filteredSubscriptions.has(realListener)) { + return []; + } + const realListenerMap = channel.filteredSubscriptions.get(realListener) as Map< + API.Types.MessageFilter, + API.Types.messageCallback[] + >; + // If no filter is specified return all listeners using that function + if (!filter) { + // array.flat is not available unless we support es2019 or higher + const listeners = Array.from(realListenerMap.values()).reduce((prev, cur) => prev.concat(...cur), []); + // remove the listener from the map + channel.filteredSubscriptions.delete(realListener); + return listeners; + } + + let listeners = realListenerMap.get(filter); + realListenerMap.delete(filter); + + return listeners || []; + } +} diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 503189ceb8..0986f426ab 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -14,6 +14,7 @@ import ConnectionManager from '../transport/connectionmanager'; import ConnectionStateChange from './connectionstatechange'; import { ErrCallback, PaginatedResultCallback } from '../../types/utils'; import BaseRealtime from './baserealtime'; +import { FilteredSubscriptions } from './filteredsubscriptions'; interface RealtimeHistoryParams { start?: number; @@ -414,7 +415,7 @@ class RealtimeChannel extends Channel { // Filtered if (event && typeof event === 'object' && !Array.isArray(event)) { - this._subscribeFilter(event, listener); + FilteredSubscriptions.subscribeFilter(this, event, listener); } else { this.subscriptions.on(event, listener); } @@ -422,113 +423,14 @@ class RealtimeChannel extends Channel { return this.attach(callback || noop); } - _subscribeFilter(filter: API.Types.MessageFilter, listener: API.Types.messageCallback) { - const filteredListener = (m: Message) => { - const mapping: { [key in keyof API.Types.MessageFilter]: any } = { - name: m.name, - refTimeserial: m.extras?.ref?.timeserial, - refType: m.extras?.ref?.type, - isRef: !!m.extras?.ref?.timeserial, - clientId: m.clientId, - }; - // Check if any values are defined in the filter and if they match the value in the message object - if ( - Object.entries(filter).find(([key, value]) => - value !== undefined ? mapping[key as keyof API.Types.MessageFilter] !== value : false - ) - ) { - return; - } - listener(m); - }; - this._addFilteredSubscription(filter, listener, filteredListener); - this.subscriptions.on(filteredListener); - } - - // Adds a new filtered subscription - _addFilteredSubscription( - filter: API.Types.MessageFilter, - realListener: API.Types.messageCallback, - filteredListener: API.Types.messageCallback - ) { - if (!this.filteredSubscriptions) { - this.filteredSubscriptions = new Map< - API.Types.messageCallback, - Map[]> - >(); - } - if (this.filteredSubscriptions.has(realListener)) { - const realListenerMap = this.filteredSubscriptions.get(realListener) as Map< - API.Types.MessageFilter, - API.Types.messageCallback[] - >; - // Add the filtered listener to the map, or append to the array if this filter has already been used - realListenerMap.set(filter, realListenerMap?.get(filter)?.concat(filteredListener) || [filteredListener]); - } else { - this.filteredSubscriptions.set( - realListener, - new Map[]>([[filter, [filteredListener]]]) - ); - } - } - - _getAndDeleteFilteredSubscriptions( - filter: API.Types.MessageFilter | undefined, - realListener: API.Types.messageCallback | undefined - ): API.Types.messageCallback[] { - // No filtered subscriptions map means there has been no filtered subscriptions yet, so return nothing - if (!this.filteredSubscriptions) { - return []; - } - // Only a filter is passed in with no specific listener - if (!realListener && filter) { - // Return each listener which is attached to the specified filter object - return Array.from(this.filteredSubscriptions.entries()) - .map(([key, filterMaps]) => { - // Get (then delete) the maps matching this filter - let listenerMaps = filterMaps.get(filter); - filterMaps.delete(filter); - // Clear the parent if nothing is left - if (filterMaps.size === 0) { - this.filteredSubscriptions?.delete(key); - } - return listenerMaps; - }) - .reduce( - (prev, cur) => (cur ? (prev as API.Types.messageCallback[]).concat(...cur) : prev), - [] - ) as API.Types.messageCallback[]; - } - - // No subscriptions for this listener - if (!realListener || !this.filteredSubscriptions.has(realListener)) { - return []; - } - const realListenerMap = this.filteredSubscriptions.get(realListener) as Map< - API.Types.MessageFilter, - API.Types.messageCallback[] - >; - // If no filter is specified return all listeners using that function - if (!filter) { - // array.flat is not available unless we support es2019 or higher - const listeners = Array.from(realListenerMap.values()).reduce((prev, cur) => prev.concat(...cur), []); - // remove the listener from the map - this.filteredSubscriptions.delete(realListener); - return listeners; - } - - let listeners = realListenerMap.get(filter); - realListenerMap.delete(filter); - - return listeners || []; - } - unsubscribe(...args: unknown[] /* [event], listener */): void { const [event, listener] = RealtimeChannel.processListenerArgs(args); // If we either have a filtered listener, a filter or both we need to do additional processing to find the original function(s) if ((typeof event === 'object' && !listener) || this.filteredSubscriptions?.has(listener)) { - this._getAndDeleteFilteredSubscriptions(event, listener).forEach((l) => this.subscriptions.off(l)); + FilteredSubscriptions.getAndDeleteFilteredSubscriptions(this, event, listener).forEach((l) => + this.subscriptions.off(l) + ); return; }