Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SDK-3736] Make MessageFilter subscription filtering tree-shakable #1473

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions scripts/moduleReport.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ const moduleNames = [
'WebSocketTransport',
'XHRRequest',
'FetchRequest',
'MessageInteractions',
];

// List of all free-standing functions exported by the library along with the
Expand Down
10 changes: 10 additions & 0 deletions src/common/lib/client/baseclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic';
import { throwMissingModuleError } from '../util/utils';
import { MsgPack } from 'common/types/msgpack';
import { HTTPRequestImplementations } from 'platform/web/lib/http/http';
import { FilteredSubscriptions } from './filteredsubscriptions';

type BatchResult<T> = API.Types.BatchResult<T>;
type BatchPublishSpec = API.Types.BatchPublishSpec;
Expand Down Expand Up @@ -44,6 +45,7 @@ class BaseClient {
readonly _MsgPack: MsgPack | null;
// Extra HTTP request implementations available to this client, in addition to those in web’s Http.bundledRequestImplementations
readonly _additionalHTTPRequestImplementations: HTTPRequestImplementations;
private readonly __FilteredSubscriptions: typeof FilteredSubscriptions | null;

constructor(options: ClientOptions | string, modules: ModulesMap) {
this._additionalHTTPRequestImplementations = modules;
Expand Down Expand Up @@ -98,6 +100,7 @@ class BaseClient {

this._rest = modules.Rest ? new modules.Rest(this) : null;
this._Crypto = modules.Crypto ?? null;
this.__FilteredSubscriptions = modules.MessageInteractions ?? null;
}

private get rest(): Rest {
Expand All @@ -107,6 +110,13 @@ class BaseClient {
return this._rest;
}

get _FilteredSubscriptions(): typeof FilteredSubscriptions {
if (!this.__FilteredSubscriptions) {
throwMissingModuleError('MessageInteractions');
}
return this.__FilteredSubscriptions;
}

get channels() {
return this.rest.channels;
}
Expand Down
2 changes: 2 additions & 0 deletions src/common/lib/client/defaultrealtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { MsgPack } from 'common/types/msgpack';
import RealtimePresence from './realtimepresence';
import { DefaultPresenceMessage } from '../types/defaultpresencemessage';
import initialiseWebSocketTransport from '../transport/websockettransport';
import { FilteredSubscriptions } from './filteredsubscriptions';

/**
`DefaultRealtime` is the class that the non tree-shakable version of the SDK exports as `Realtime`. It ensures that this version of the SDK includes all of the functionality which is optionally available in the tree-shakable version.
Expand All @@ -27,6 +28,7 @@ export class DefaultRealtime extends BaseRealtime {
MsgPack,
RealtimePresence,
WebSocketTransport: initialiseWebSocketTransport,
MessageInteractions: FilteredSubscriptions,
});
}

Expand Down
112 changes: 112 additions & 0 deletions src/common/lib/client/filteredsubscriptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
import * as API from '../../../../ably';
import RealtimeChannel from './realtimechannel';
import Message from '../types/message';

export class FilteredSubscriptions {
static subscribeFilter(
channel: RealtimeChannel,
filter: API.Types.MessageFilter,
listener: API.Types.messageCallback<Message>
) {
const filteredListener = (m: Message) => {
const mapping: { [key in keyof API.Types.MessageFilter]: any } = {
name: m.name,
refTimeserial: m.extras?.ref?.timeserial,
refType: m.extras?.ref?.type,
isRef: !!m.extras?.ref?.timeserial,
clientId: m.clientId,
};
// Check if any values are defined in the filter and if they match the value in the message object
if (
Object.entries(filter).find(([key, value]) =>
value !== undefined ? mapping[key as keyof API.Types.MessageFilter] !== value : false
)
) {
return;
}
listener(m);
};
this.addFilteredSubscription(channel, filter, listener, filteredListener);
channel.subscriptions.on(filteredListener);
}

// Adds a new filtered subscription
static addFilteredSubscription(
channel: RealtimeChannel,
filter: API.Types.MessageFilter,
realListener: API.Types.messageCallback<Message>,
filteredListener: API.Types.messageCallback<Message>
) {
if (!channel.filteredSubscriptions) {
channel.filteredSubscriptions = new Map<
API.Types.messageCallback<Message>,
Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>
>();
}
if (channel.filteredSubscriptions.has(realListener)) {
const realListenerMap = channel.filteredSubscriptions.get(realListener) as Map<
API.Types.MessageFilter,
API.Types.messageCallback<Message>[]
>;
// Add the filtered listener to the map, or append to the array if this filter has already been used
realListenerMap.set(filter, realListenerMap?.get(filter)?.concat(filteredListener) || [filteredListener]);
} else {
channel.filteredSubscriptions.set(
realListener,
new Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>([[filter, [filteredListener]]])
);
}
}

static getAndDeleteFilteredSubscriptions(
channel: RealtimeChannel,
filter: API.Types.MessageFilter | undefined,
realListener: API.Types.messageCallback<Message> | undefined
): API.Types.messageCallback<Message>[] {
// No filtered subscriptions map means there has been no filtered subscriptions yet, so return nothing
if (!channel.filteredSubscriptions) {
return [];
}
// Only a filter is passed in with no specific listener
if (!realListener && filter) {
// Return each listener which is attached to the specified filter object
return Array.from(channel.filteredSubscriptions.entries())
.map(([key, filterMaps]) => {
// Get (then delete) the maps matching this filter
let listenerMaps = filterMaps.get(filter);
filterMaps.delete(filter);
// Clear the parent if nothing is left
if (filterMaps.size === 0) {
channel.filteredSubscriptions?.delete(key);
}
return listenerMaps;
})
.reduce(
(prev, cur) => (cur ? (prev as API.Types.messageCallback<Message>[]).concat(...cur) : prev),
[]
) as API.Types.messageCallback<Message>[];
}

// No subscriptions for this listener
if (!realListener || !channel.filteredSubscriptions.has(realListener)) {
return [];
}
const realListenerMap = channel.filteredSubscriptions.get(realListener) as Map<
API.Types.MessageFilter,
API.Types.messageCallback<Message>[]
>;
// If no filter is specified return all listeners using that function
if (!filter) {
// array.flat is not available unless we support es2019 or higher
const listeners = Array.from(realListenerMap.values()).reduce((prev, cur) => prev.concat(...cur), []);
// remove the listener from the map
channel.filteredSubscriptions.delete(realListener);
return listeners;
}

let listeners = realListenerMap.get(filter);
realListenerMap.delete(filter);

return listeners || [];
}
}
2 changes: 2 additions & 0 deletions src/common/lib/client/modulesmap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import RealtimePresence from './realtimepresence';
import { TransportInitialiser } from '../transport/connectionmanager';
import XHRRequest from 'platform/web/lib/http/request/xhrrequest';
import fetchRequest from 'platform/web/lib/http/request/fetchrequest';
import { FilteredSubscriptions } from './filteredsubscriptions';

export interface ModulesMap {
Rest?: typeof Rest;
Expand All @@ -16,6 +17,7 @@ export interface ModulesMap {
XHRStreaming?: TransportInitialiser;
XHRRequest?: typeof XHRRequest;
FetchRequest?: typeof fetchRequest;
MessageInteractions?: typeof FilteredSubscriptions;
}

export const allCommonModules: ModulesMap = { Rest };
107 changes: 4 additions & 103 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -438,121 +438,22 @@ class RealtimeChannel extends Channel {

// Filtered
if (event && typeof event === 'object' && !Array.isArray(event)) {
this._subscribeFilter(event, listener);
this.client._FilteredSubscriptions.subscribeFilter(this, event, listener);
} else {
this.subscriptions.on(event, listener);
}

return this.attach(callback || noop);
}

_subscribeFilter(filter: API.Types.MessageFilter, listener: API.Types.messageCallback<Message>) {
const filteredListener = (m: Message) => {
const mapping: { [key in keyof API.Types.MessageFilter]: any } = {
name: m.name,
refTimeserial: m.extras?.ref?.timeserial,
refType: m.extras?.ref?.type,
isRef: !!m.extras?.ref?.timeserial,
clientId: m.clientId,
};
// Check if any values are defined in the filter and if they match the value in the message object
if (
Object.entries(filter).find(([key, value]) =>
value !== undefined ? mapping[key as keyof API.Types.MessageFilter] !== value : false
)
) {
return;
}
listener(m);
};
this._addFilteredSubscription(filter, listener, filteredListener);
this.subscriptions.on(filteredListener);
}

// Adds a new filtered subscription
_addFilteredSubscription(
filter: API.Types.MessageFilter,
realListener: API.Types.messageCallback<Message>,
filteredListener: API.Types.messageCallback<Message>
) {
if (!this.filteredSubscriptions) {
this.filteredSubscriptions = new Map<
API.Types.messageCallback<Message>,
Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>
>();
}
if (this.filteredSubscriptions.has(realListener)) {
const realListenerMap = this.filteredSubscriptions.get(realListener) as Map<
API.Types.MessageFilter,
API.Types.messageCallback<Message>[]
>;
// Add the filtered listener to the map, or append to the array if this filter has already been used
realListenerMap.set(filter, realListenerMap?.get(filter)?.concat(filteredListener) || [filteredListener]);
} else {
this.filteredSubscriptions.set(
realListener,
new Map<API.Types.MessageFilter, API.Types.messageCallback<Message>[]>([[filter, [filteredListener]]])
);
}
}

_getAndDeleteFilteredSubscriptions(
filter: API.Types.MessageFilter | undefined,
realListener: API.Types.messageCallback<Message> | undefined
): API.Types.messageCallback<Message>[] {
// No filtered subscriptions map means there has been no filtered subscriptions yet, so return nothing
if (!this.filteredSubscriptions) {
return [];
}
// Only a filter is passed in with no specific listener
if (!realListener && filter) {
// Return each listener which is attached to the specified filter object
return Array.from(this.filteredSubscriptions.entries())
.map(([key, filterMaps]) => {
// Get (then delete) the maps matching this filter
let listenerMaps = filterMaps.get(filter);
filterMaps.delete(filter);
// Clear the parent if nothing is left
if (filterMaps.size === 0) {
this.filteredSubscriptions?.delete(key);
}
return listenerMaps;
})
.reduce(
(prev, cur) => (cur ? (prev as API.Types.messageCallback<Message>[]).concat(...cur) : prev),
[]
) as API.Types.messageCallback<Message>[];
}

// No subscriptions for this listener
if (!realListener || !this.filteredSubscriptions.has(realListener)) {
return [];
}
const realListenerMap = this.filteredSubscriptions.get(realListener) as Map<
API.Types.MessageFilter,
API.Types.messageCallback<Message>[]
>;
// If no filter is specified return all listeners using that function
if (!filter) {
// array.flat is not available unless we support es2019 or higher
const listeners = Array.from(realListenerMap.values()).reduce((prev, cur) => prev.concat(...cur), []);
// remove the listener from the map
this.filteredSubscriptions.delete(realListener);
return listeners;
}

let listeners = realListenerMap.get(filter);
realListenerMap.delete(filter);

return listeners || [];
}

unsubscribe(...args: unknown[] /* [event], listener */): void {
const [event, listener] = RealtimeChannel.processListenerArgs(args);

// If we either have a filtered listener, a filter or both we need to do additional processing to find the original function(s)
if ((typeof event === 'object' && !listener) || this.filteredSubscriptions?.has(listener)) {
this._getAndDeleteFilteredSubscriptions(event, listener).forEach((l) => this.subscriptions.off(l));
this.client._FilteredSubscriptions
.getAndDeleteFilteredSubscriptions(this, event, listener)
.forEach((l) => this.subscriptions.off(l));
return;
}

Expand Down
1 change: 1 addition & 0 deletions src/platform/web/modules.ts
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,5 @@ export * from './modules/realtimepresence';
export * from './modules/transports';
export * from './modules/http';
export { Rest } from '../../common/lib/client/rest';
export { FilteredSubscriptions as MessageInteractions } from '../../common/lib/client/filteredsubscriptions';
export { BaseRest, BaseRealtime, ErrorInfo };
Loading
Loading