diff --git a/src/common/lib/client/auth.ts b/src/common/lib/client/auth.ts index b4db9380c4..c2a0ef0382 100644 --- a/src/common/lib/client/auth.ts +++ b/src/common/lib/client/auth.ts @@ -11,7 +11,6 @@ import ClientOptions from '../../types/ClientOptions'; import HttpMethods from '../../constants/HttpMethods'; import HttpStatusCodes from 'common/constants/HttpStatusCodes'; import Platform from '../../platform'; -import Resource from './resource'; import Defaults from '../util/defaults'; type BatchResult = API.Types.BatchResult; @@ -96,7 +95,7 @@ function basicAuthForced(options: ClientOptions) { } /* RSA4 */ -function useTokenAuth(options: ClientOptions) { +export function useTokenAuth(options: ClientOptions) { return ( options.useTokenAuth || (!basicAuthForced(options) && (options.authCallback || options.authUrl || options.token || options.tokenDetails)) @@ -1038,64 +1037,8 @@ class Auth { revokeTokens( specifiers: TokenRevocationTargetSpecifier[], options?: TokenRevocationOptions - ): Promise; - revokeTokens( - specifiers: TokenRevocationTargetSpecifier[], - optionsOrCallbackArg?: TokenRevocationOptions | StandardCallback, - callbackArg?: StandardCallback - ): void | Promise { - if (useTokenAuth(this.client.options)) { - throw new ErrorInfo('Cannot revoke tokens when using token auth', 40162, 401); - } - - const keyName = this.client.options.keyName!; - - let resolvedOptions: TokenRevocationOptions; - - if (typeof optionsOrCallbackArg === 'function') { - callbackArg = optionsOrCallbackArg; - resolvedOptions = {}; - } else { - resolvedOptions = optionsOrCallbackArg ?? {}; - } - - if (callbackArg === undefined) { - return Utils.promisify(this, 'revokeTokens', [specifiers, resolvedOptions]); - } - - const callback = callbackArg; - - const requestBodyDTO = { - targets: specifiers.map((specifier) => `${specifier.type}:${specifier.value}`), - ...resolvedOptions, - }; - - const format = this.client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, - headers = Defaults.defaultPostHeaders(this.client.options, { format }); - - if (this.client.options.headers) Utils.mixin(headers, this.client.options.headers); - - const requestBody = Utils.encodeBody(requestBodyDTO, this.client._MsgPack, format); - Resource.post( - this.client, - `/keys/${keyName}/revokeTokens`, - requestBody, - headers, - { newBatchResponse: 'true' }, - null, - (err, body, headers, unpacked) => { - if (err) { - callback(err); - return; - } - - const batchResult = ( - unpacked ? body : Utils.decodeBody(body, this.client._MsgPack, format) - ) as TokenRevocationResult; - - callback(null, batchResult); - } - ); + ): Promise { + return this.client.rest.revokeTokens(specifiers, options); } } diff --git a/src/common/lib/client/baseclient.ts b/src/common/lib/client/baseclient.ts index 482075fa23..70eb307bff 100644 --- a/src/common/lib/client/baseclient.ts +++ b/src/common/lib/client/baseclient.ts @@ -103,7 +103,7 @@ class BaseClient { this.__FilteredSubscriptions = modules.MessageInteractions ?? null; } - private get rest(): Rest { + get rest(): Rest { if (!this._rest) { throwMissingModuleError('Rest'); } diff --git a/src/common/lib/client/presence.ts b/src/common/lib/client/presence.ts deleted file mode 100644 index 8acbed68bf..0000000000 --- a/src/common/lib/client/presence.ts +++ /dev/null @@ -1,96 +0,0 @@ -import * as Utils from '../util/utils'; -import EventEmitter from '../util/eventemitter'; -import Logger from '../util/logger'; -import PaginatedResource, { PaginatedResult } from './paginatedresource'; -import PresenceMessage from '../types/presencemessage'; -import { CipherOptions } from '../types/message'; -import { PaginatedResultCallback } from '../../types/utils'; -import Channel from './channel'; -import RealtimeChannel from './realtimechannel'; -import Defaults from '../util/defaults'; - -class Presence extends EventEmitter { - channel: RealtimeChannel | Channel; - basePath: string; - - constructor(channel: RealtimeChannel | Channel) { - super(); - this.channel = channel; - this.basePath = channel.basePath + '/presence'; - } - - get(params: any, callback: PaginatedResultCallback): void | Promise { - Logger.logAction(Logger.LOG_MICRO, 'Presence.get()', 'channel = ' + this.channel.name); - /* params and callback are optional; see if params contains the callback */ - if (callback === undefined) { - if (typeof params == 'function') { - callback = params; - params = null; - } else { - return Utils.promisify(this, 'get', arguments); - } - } - const client = this.channel.client, - format = client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, - envelope = this.channel.client.http.supportsLinkHeaders ? undefined : format, - headers = Defaults.defaultGetHeaders(client.options, { format }); - - Utils.mixin(headers, client.options.headers); - - const options = this.channel.channelOptions; - new PaginatedResource(client, this.basePath, headers, envelope, async function (body, headers, unpacked) { - return await PresenceMessage.fromResponseBody( - body as Record[], - options as CipherOptions, - client._MsgPack, - unpacked ? undefined : format - ); - }).get(params, callback); - } - - history( - params: any, - callback: PaginatedResultCallback - ): void | Promise> { - Logger.logAction(Logger.LOG_MICRO, 'Presence.history()', 'channel = ' + this.channel.name); - return this._history(params, callback); - } - - _history( - params: any, - callback: PaginatedResultCallback - ): void | Promise> { - /* params and callback are optional; see if params contains the callback */ - if (callback === undefined) { - if (typeof params == 'function') { - callback = params; - params = null; - } else { - return Utils.promisify(this, '_history', [params]); - } - } - - const client = this.channel.client, - format = client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, - envelope = this.channel.client.http.supportsLinkHeaders ? undefined : format, - headers = Defaults.defaultGetHeaders(client.options, { format }); - - Utils.mixin(headers, client.options.headers); - - const options = this.channel.channelOptions; - new PaginatedResource(client, this.basePath + '/history', headers, envelope, async function ( - body, - headers, - unpacked - ) { - return await PresenceMessage.fromResponseBody( - body as Record[], - options as CipherOptions, - client._MsgPack, - unpacked ? undefined : format - ); - }).get(params, callback); - } -} - -export default Presence; diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 898377dded..e3d054ca15 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -1,7 +1,6 @@ import ProtocolMessage from '../types/protocolmessage'; import EventEmitter from '../util/eventemitter'; import * as Utils from '../util/utils'; -import Channel from './channel'; import Logger from '../util/logger'; import RealtimePresence from './realtimepresence'; import Message, { CipherOptions } from '../types/message'; @@ -14,6 +13,8 @@ import ConnectionManager from '../transport/connectionmanager'; import ConnectionStateChange from './connectionstatechange'; import { ErrCallback, PaginatedResultCallback, StandardCallback } from '../../types/utils'; import BaseRealtime from './baserealtime'; +import { ChannelOptions } from '../../types/channel'; +import { normaliseChannelOptions } from '../util/defaults'; interface RealtimeHistoryParams { start?: number; @@ -48,14 +49,16 @@ function validateChannelOptions(options?: API.Types.ChannelOptions) { } } -class RealtimeChannel extends Channel { - realtime: BaseRealtime; - private _realtimePresence: RealtimePresence | null; +class RealtimeChannel extends EventEmitter { + name: string; + channelOptions: ChannelOptions; + client: BaseRealtime; + private _presence: RealtimePresence | null; get presence(): RealtimePresence { - if (!this._realtimePresence) { + if (!this._presence) { Utils.throwMissingModuleError('RealtimePresence'); } - return this._realtimePresence; + return this._presence; } connectionManager: ConnectionManager; state: API.Types.ChannelState; @@ -86,12 +89,14 @@ class RealtimeChannel extends Channel { retryTimer?: number | NodeJS.Timeout | null; retryCount: number = 0; - constructor(realtime: BaseRealtime, name: string, options?: API.Types.ChannelOptions) { - super(realtime, name, options); + constructor(client: BaseRealtime, name: string, options?: API.Types.ChannelOptions) { + super(); Logger.logAction(Logger.LOG_MINOR, 'RealtimeChannel()', 'started; name = ' + name); - this.realtime = realtime; - this._realtimePresence = realtime._RealtimePresence ? new realtime._RealtimePresence(this) : null; - this.connectionManager = realtime.connection.connectionManager; + this.name = name; + this.channelOptions = normaliseChannelOptions(client._Crypto ?? null, options); + this.client = client; + this._presence = client._RealtimePresence ? new client._RealtimePresence(this) : null; + this.connectionManager = client.connection.connectionManager; this.state = 'initialized'; this.subscriptions = new EventEmitter(); this.syncChannelSerial = undefined; @@ -106,7 +111,7 @@ class RealtimeChannel extends Channel { this._attachResume = false; this._decodingContext = { channelOptions: this.channelOptions, - plugins: realtime.options.plugins || {}, + plugins: client.options.plugins || {}, baseEncodedPreviousPayload: undefined, }; this._lastPayload = { @@ -156,7 +161,7 @@ class RealtimeChannel extends Channel { _callback(err); return; } - Channel.prototype.setOptions.call(this, options); + this.channelOptions = normaliseChannelOptions(this.client._Crypto ?? null, options); if (this._decodingContext) this._decodingContext.channelOptions = this.channelOptions; if (this._shouldReattachToSetOptions(options)) { /* This does not just do _attach(true, null, callback) because that would put us @@ -236,7 +241,7 @@ class RealtimeChannel extends Channel { } else { messages = [Message.fromValues({ name: args[0], data: args[1] })]; } - const maxMessageSize = this.realtime.options.maxMessageSize; + const maxMessageSize = this.client.options.maxMessageSize; Message.encodeArray(messages, this.channelOptions as CipherOptions, (err: Error | null) => { if (err) { callback(err); @@ -258,12 +263,11 @@ class RealtimeChannel extends Channel { ); return; } - this.__publish(messages, callback); + this._publish(messages, callback); }); } - // Double underscore used to prevent type conflict with underlying Channel._publish method - __publish(messages: Array, callback: ErrCallback) { + _publish(messages: Array, callback: ErrCallback) { Logger.logAction(Logger.LOG_MICRO, 'RealtimeChannel.publish()', 'message count = ' + messages.length); const state = this.state; switch (state) { @@ -483,7 +487,7 @@ class RealtimeChannel extends Channel { } sendMessage(msg: ProtocolMessage, callback?: ErrCallback): void { - this.connectionManager.send(msg, this.realtime.options.queueMessages, callback); + this.connectionManager.send(msg, this.client.options.queueMessages, callback); } sendPresence(presence: PresenceMessage | PresenceMessage[], callback?: ErrCallback): void { @@ -523,8 +527,8 @@ class RealtimeChannel extends Channel { if (this.state === 'attached') { if (!resumed) { /* On a loss of continuity, the presence set needs to be re-synced */ - if (this._realtimePresence) { - this._realtimePresence.onAttached(hasPresence); + if (this._presence) { + this._presence.onAttached(hasPresence); } } const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error); @@ -583,8 +587,8 @@ class RealtimeChannel extends Channel { Logger.logAction(Logger.LOG_ERROR, 'RealtimeChannel.processMessage()', (e as Error).toString()); } } - if (this._realtimePresence) { - this._realtimePresence.setPresence(presence, isSync, syncChannelSerial as any); + if (this._presence) { + this._presence.setPresence(presence, isSync, syncChannelSerial as any); } break; } @@ -721,8 +725,8 @@ class RealtimeChannel extends Channel { if (state === this.state) { return; } - if (this._realtimePresence) { - this._realtimePresence.actOnChannelState(state, hasPresence, reason); + if (this._presence) { + this._presence.actOnChannelState(state, hasPresence, reason); } if (state === 'suspended' && this.connectionManager.state.sendEvents) { this.startRetryTimer(); @@ -829,7 +833,7 @@ class RealtimeChannel extends Channel { Logger.logAction(Logger.LOG_MINOR, 'RealtimeChannel.startStateTimerIfNotRunning', 'timer expired'); this.stateTimer = null; this.timeoutPendingState(); - }, this.realtime.options.timeouts.realtimeRequestTimeout); + }, this.client.options.timeouts.realtimeRequestTimeout); } } @@ -845,7 +849,7 @@ class RealtimeChannel extends Channel { if (this.retryTimer) return; this.retryCount++; - const retryDelay = Utils.getRetryTime(this.realtime.options.timeouts.channelRetryTimeout, this.retryCount); + const retryDelay = Utils.getRetryTime(this.client.options.timeouts.channelRetryTimeout, this.retryCount); this.retryTimer = setTimeout(() => { /* If connection is not connected, just leave in suspended, a reattach @@ -881,6 +885,9 @@ class RealtimeChannel extends Channel { } } + // We fetch this first so that any module-not-provided error takes priority over other errors + const restMixin = this.client.rest.channelMixin; + if (params && params.untilAttach) { if (this.state !== 'attached') { callback(new ErrorInfo('option untilAttach requires the channel to be attached', 40000, 400)); @@ -900,7 +907,7 @@ class RealtimeChannel extends Channel { params.from_serial = this.properties.attachSerial; } - Channel.prototype._history.call(this, params, callback); + return restMixin.history(this, params, callback); } as any; whenState = ((state: string, listener: ErrCallback) => { @@ -934,6 +941,10 @@ class RealtimeChannel extends Channel { this.properties.channelSerial = channelSerial; } } + + status(callback?: StandardCallback): void | Promise { + return this.client.rest.channelMixin.status(this, callback); + } } export default RealtimeChannel; diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index 5d5e84c346..0027e22e20 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -1,5 +1,4 @@ import * as Utils from '../util/utils'; -import Presence from './presence'; import EventEmitter from '../util/eventemitter'; import Logger from '../util/logger'; import PresenceMessage, { fromValues as presenceMessageFromValues } from '../types/presencemessage'; @@ -27,11 +26,11 @@ interface RealtimeHistoryParams { } function getClientId(realtimePresence: RealtimePresence) { - return realtimePresence.channel.realtime.auth.clientId; + return realtimePresence.channel.client.auth.clientId; } function isAnonymousOrWildcard(realtimePresence: RealtimePresence) { - const realtime = realtimePresence.channel.realtime; + const realtime = realtimePresence.channel.client; /* If not currently connected, we can't assume that we're an anonymous * client, as realtime may inform us of our clientId in the CONNECTED * message. So assume we're not anonymous and leave it to realtime to @@ -78,7 +77,7 @@ function newerThan(item: PresenceMessage, existing: PresenceMessage) { } } -class RealtimePresence extends Presence { +class RealtimePresence extends EventEmitter { channel: RealtimeChannel; pendingPresence: { presence: PresenceMessage; callback: ErrCallback }[]; syncComplete: boolean; @@ -88,7 +87,7 @@ class RealtimePresence extends Presence { name?: string; constructor(channel: RealtimeChannel) { - super(channel); + super(); this.channel = channel; this.syncComplete = false; this.members = new PresenceMap(this, (item) => item.clientId + ':' + item.connectionId); @@ -244,8 +243,11 @@ class RealtimePresence extends Presence { } } - // Return type is any to avoid conflict with base Presence class - get(this: RealtimePresence, params: RealtimePresenceParams, callback: StandardCallback): any { + get( + this: RealtimePresence, + params: RealtimePresenceParams, + callback: StandardCallback + ): void | Promise { const args = Array.prototype.slice.call(arguments); if (args.length == 1 && typeof args[0] == 'function') args.unshift(null); @@ -304,6 +306,9 @@ class RealtimePresence extends Presence { } } + // We fetch this first so that any module-not-provided error takes priority over other errors + const restMixin = this.channel.client.rest.presenceMixin; + if (params && params.untilAttach) { if (this.channel.state === 'attached') { delete params.untilAttach; @@ -319,7 +324,7 @@ class RealtimePresence extends Presence { } } - Presence.prototype._history.call(this, params, callback); + return restMixin.history(this, params, callback); } setPresence(presenceSet: PresenceMessage[], isSync: boolean, syncChannelSerial?: string): void { diff --git a/src/common/lib/client/rest.ts b/src/common/lib/client/rest.ts index a3592f9945..c9a26f0061 100644 --- a/src/common/lib/client/rest.ts +++ b/src/common/lib/client/rest.ts @@ -3,7 +3,7 @@ import Logger, { LoggerOptions } from '../util/logger'; import Defaults from '../util/defaults'; import Push from './push'; import PaginatedResource, { HttpPaginatedResponse, PaginatedResult } from './paginatedresource'; -import Channel from './channel'; +import RestChannel from './restchannel'; import ErrorInfo from '../types/errorinfo'; import Stats from '../types/stats'; import HttpMethods from '../../constants/HttpMethods'; @@ -15,8 +15,12 @@ import Resource from './resource'; import Platform from '../../platform'; import BaseClient from './baseclient'; +import { useTokenAuth } from './auth'; +import { RestChannelMixin } from './restchannelmixin'; +import { RestPresenceMixin } from './restpresencemixin'; type BatchResult = API.Types.BatchResult; + type BatchPublishSpec = API.Types.BatchPublishSpec; type BatchPublishSuccessResult = API.Types.BatchPublishSuccessResult; type BatchPublishFailureResult = API.Types.BatchPublishFailureResult; @@ -25,12 +29,21 @@ type BatchPresenceSuccessResult = API.Types.BatchPresenceSuccessResult; type BatchPresenceFailureResult = API.Types.BatchPresenceFailureResult; type BatchPresenceResult = BatchResult; +type TokenRevocationTargetSpecifier = API.Types.TokenRevocationTargetSpecifier; +type TokenRevocationOptions = API.Types.TokenRevocationOptions; +type TokenRevocationSuccessResult = API.Types.TokenRevocationSuccessResult; +type TokenRevocationFailureResult = API.Types.TokenRevocationFailureResult; +type TokenRevocationResult = BatchResult; + const noop = function () {}; export class Rest { private readonly client: BaseClient; readonly channels: Channels; readonly push: Push; + readonly channelMixin = RestChannelMixin; + readonly presenceMixin = RestPresenceMixin; + constructor(client: BaseClient) { this.client = client; this.channels = new Channels(this.client); @@ -261,6 +274,54 @@ export class Rest { ); } + revokeTokens( + specifiers: TokenRevocationTargetSpecifier[], + options?: TokenRevocationOptions + ): Promise { + if (useTokenAuth(this.client.options)) { + throw new ErrorInfo('Cannot revoke tokens when using token auth', 40162, 401); + } + + const keyName = this.client.options.keyName!; + + let resolvedOptions = options ?? {}; + + const requestBodyDTO = { + targets: specifiers.map((specifier) => `${specifier.type}:${specifier.value}`), + ...resolvedOptions, + }; + + const format = this.client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, + headers = Defaults.defaultPostHeaders(this.client.options, { format }); + + if (this.client.options.headers) Utils.mixin(headers, this.client.options.headers); + + const requestBody = Utils.encodeBody(requestBodyDTO, this.client._MsgPack, format); + + return new Promise((resolve, reject) => { + Resource.post( + this.client, + `/keys/${keyName}/revokeTokens`, + requestBody, + headers, + { newBatchResponse: 'true' }, + null, + (err, body, headers, unpacked) => { + if (err) { + reject(err); + return; + } + + const batchResult = ( + unpacked ? body : Utils.decodeBody(body, this.client._MsgPack, format) + ) as TokenRevocationResult; + + resolve(batchResult); + } + ); + }); + } + setLog(logOptions: LoggerOptions): void { Logger.setLog(logOptions.level, logOptions.handler); } @@ -268,7 +329,7 @@ export class Rest { class Channels { client: BaseClient; - all: Record; + all: Record; constructor(client: BaseClient) { this.client = client; @@ -279,7 +340,7 @@ class Channels { name = String(name); let channel = this.all[name]; if (!channel) { - this.all[name] = channel = new Channel(this.client, name, channelOptions); + this.all[name] = channel = new RestChannel(this.client, name, channelOptions); } else if (channelOptions) { channel.setOptions(channelOptions); } diff --git a/src/common/lib/client/channel.ts b/src/common/lib/client/restchannel.ts similarity index 58% rename from src/common/lib/client/channel.ts rename to src/common/lib/client/restchannel.ts index 2922de7adb..6197c65191 100644 --- a/src/common/lib/client/channel.ts +++ b/src/common/lib/client/restchannel.ts @@ -1,26 +1,16 @@ import * as Utils from '../util/utils'; -import EventEmitter from '../util/eventemitter'; import Logger from '../util/logger'; -import Presence from './presence'; +import RestPresence from './restpresence'; import Message, { CipherOptions } from '../types/message'; import ErrorInfo from '../types/errorinfo'; -import PaginatedResource, { PaginatedResult } from './paginatedresource'; +import { PaginatedResult } from './paginatedresource'; import Resource, { ResourceCallback } from './resource'; import { ChannelOptions } from '../../types/channel'; import { PaginatedResultCallback, StandardCallback } from '../../types/utils'; -import BaseClient from './baseclient'; +import BaseRest from './baseclient'; import * as API from '../../../../ably'; -import Defaults from '../util/defaults'; -import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; - -interface RestHistoryParams { - start?: number; - end?: number; - direction?: string; - limit?: number; -} - -function noop() {} +import Defaults, { normaliseChannelOptions } from '../util/defaults'; +import { RestHistoryParams } from './restchannelmixin'; const MSG_ID_ENTROPY_BYTES = 9; @@ -30,39 +20,17 @@ function allEmptyIds(messages: Array) { }); } -function normaliseChannelOptions(Crypto: IUntypedCryptoStatic | null, options?: ChannelOptions) { - const channelOptions = options || {}; - if (channelOptions.cipher) { - if (!Crypto) Utils.throwMissingModuleError('Crypto'); - const cipher = Crypto.getCipher(channelOptions.cipher); - channelOptions.cipher = cipher.cipherParams; - channelOptions.channelCipher = cipher.cipher; - } else if ('cipher' in channelOptions) { - /* Don't deactivate an existing cipher unless options - * has a 'cipher' key that's falsey */ - channelOptions.cipher = undefined; - channelOptions.channelCipher = null; - } - return channelOptions; -} - -class Channel extends EventEmitter { - client: BaseClient; +class RestChannel { + client: BaseRest; name: string; - basePath: string; - private _presence: Presence; - get presence(): Presence { - return this._presence; - } + presence: RestPresence; channelOptions: ChannelOptions; - constructor(client: BaseClient, name: string, channelOptions?: ChannelOptions) { - super(); - Logger.logAction(Logger.LOG_MINOR, 'Channel()', 'started; name = ' + name); - this.client = client; + constructor(client: BaseRest, name: string, channelOptions?: ChannelOptions) { + Logger.logAction(Logger.LOG_MINOR, 'RestChannel()', 'started; name = ' + name); this.name = name; - this.basePath = '/channels/' + encodeURIComponent(name); - this._presence = new Presence(this); + this.client = client; + this.presence = new RestPresence(this); this.channelOptions = normaliseChannelOptions(client._Crypto ?? null, channelOptions); } @@ -74,7 +42,7 @@ class Channel extends EventEmitter { params: RestHistoryParams | null, callback: PaginatedResultCallback ): Promise> | void { - Logger.logAction(Logger.LOG_MICRO, 'Channel.history()', 'channel = ' + this.name); + Logger.logAction(Logger.LOG_MICRO, 'RestChannel.history()', 'channel = ' + this.name); /* params and callback are optional; see if params contains the callback */ if (callback === undefined) { if (typeof params == 'function') { @@ -85,25 +53,7 @@ class Channel extends EventEmitter { } } - this._history(params, callback); - } - - _history(params: RestHistoryParams | null, callback: PaginatedResultCallback): void { - const client = this.client, - format = client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, - envelope = this.client.http.supportsLinkHeaders ? undefined : format, - headers = Defaults.defaultGetHeaders(client.options, { format }); - - Utils.mixin(headers, client.options.headers); - - const options = this.channelOptions; - new PaginatedResource(client, this.basePath + '/messages', headers, envelope, async function ( - body, - headers, - unpacked - ) { - return await Message.fromResponseBody(body as Message[], options, client._MsgPack, unpacked ? undefined : format); - }).get(params as Record, callback); + this.client.rest.channelMixin.history(this, params, callback); } publish(): void | Promise { @@ -185,19 +135,20 @@ class Channel extends EventEmitter { } _publish(requestBody: unknown, headers: Record, params: any, callback: ResourceCallback): void { - Resource.post(this.client, this.basePath + '/messages', requestBody, headers, params, null, callback); + Resource.post( + this.client, + this.client.rest.channelMixin.basePath(this) + '/messages', + requestBody, + headers, + params, + null, + callback + ); } status(callback?: StandardCallback): void | Promise { - if (typeof callback !== 'function') { - return Utils.promisify(this, 'status', []); - } - - const format = this.client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json; - const headers = Defaults.defaultPostHeaders(this.client.options, { format }); - - Resource.get(this.client, this.basePath, headers, {}, format, callback || noop); + return this.client.rest.channelMixin.status(this, callback); } } -export default Channel; +export default RestChannel; diff --git a/src/common/lib/client/restchannelmixin.ts b/src/common/lib/client/restchannelmixin.ts new file mode 100644 index 0000000000..9986dc4e74 --- /dev/null +++ b/src/common/lib/client/restchannelmixin.ts @@ -0,0 +1,67 @@ +import * as API from '../../../../ably'; +import RestChannel from './restchannel'; +import RealtimeChannel from './realtimechannel'; +import * as Utils from '../util/utils'; +import { PaginatedResultCallback, StandardCallback } from '../../types/utils'; +import Message from '../types/message'; +import Defaults from '../util/defaults'; +import PaginatedResource from './paginatedresource'; +import Resource from './resource'; + +export interface RestHistoryParams { + start?: number; + end?: number; + direction?: string; + limit?: number; +} + +const noop = function () {}; + +export class RestChannelMixin { + static basePath(channel: RestChannel | RealtimeChannel) { + return '/channels/' + encodeURIComponent(channel.name); + } + + static history( + channel: RestChannel | RealtimeChannel, + params: RestHistoryParams | null, + callback: PaginatedResultCallback + ): void { + const client = channel.client, + format = client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, + envelope = channel.client.http.supportsLinkHeaders ? undefined : format, + headers = Defaults.defaultGetHeaders(client.options, { format }); + + Utils.mixin(headers, client.options.headers); + + const options = channel.channelOptions; + new PaginatedResource(client, this.basePath(channel) + '/messages', headers, envelope, async function ( + body, + headers, + unpacked + ) { + return await Message.fromResponseBody(body as Message[], options, client._MsgPack, unpacked ? undefined : format); + }).get(params as Record, callback); + } + + static status( + channel: RestChannel | RealtimeChannel, + callback?: StandardCallback + ): void | Promise { + if (typeof callback !== 'function') { + return Utils.promisify(this, 'status', [channel]); + } + + const format = channel.client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json; + const headers = Defaults.defaultPostHeaders(channel.client.options, { format }); + + Resource.get( + channel.client, + this.basePath(channel), + headers, + {}, + format, + callback || noop + ); + } +} diff --git a/src/common/lib/client/restpresence.ts b/src/common/lib/client/restpresence.ts new file mode 100644 index 0000000000..7aec14dcb2 --- /dev/null +++ b/src/common/lib/client/restpresence.ts @@ -0,0 +1,61 @@ +import * as Utils from '../util/utils'; +import Logger from '../util/logger'; +import PaginatedResource, { PaginatedResult } from './paginatedresource'; +import PresenceMessage from '../types/presencemessage'; +import { CipherOptions } from '../types/message'; +import { PaginatedResultCallback } from '../../types/utils'; +import RestChannel from './restchannel'; +import Defaults from '../util/defaults'; + +class RestPresence { + channel: RestChannel; + + constructor(channel: RestChannel) { + this.channel = channel; + } + + get(params: any, callback: PaginatedResultCallback): void | Promise { + Logger.logAction(Logger.LOG_MICRO, 'RestPresence.get()', 'channel = ' + this.channel.name); + /* params and callback are optional; see if params contains the callback */ + if (callback === undefined) { + if (typeof params == 'function') { + callback = params; + params = null; + } else { + return Utils.promisify(this, 'get', arguments); + } + } + const client = this.channel.client, + format = client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, + envelope = this.channel.client.http.supportsLinkHeaders ? undefined : format, + headers = Defaults.defaultGetHeaders(client.options, { format }); + + Utils.mixin(headers, client.options.headers); + + const options = this.channel.channelOptions; + new PaginatedResource( + client, + this.channel.client.rest.presenceMixin.basePath(this), + headers, + envelope, + async function (body, headers, unpacked) { + return await PresenceMessage.fromResponseBody( + body as Record[], + options as CipherOptions, + client._MsgPack, + unpacked ? undefined : format + ); + } + ).get(params, callback); + } + + history( + params: any, + callback: PaginatedResultCallback + ): void | Promise> { + Logger.logAction(Logger.LOG_MICRO, 'RestPresence.history()', 'channel = ' + this.channel.name); + return this.channel.client.rest.presenceMixin.history(this, params, callback); + } +} + +export default RestPresence; diff --git a/src/common/lib/client/restpresencemixin.ts b/src/common/lib/client/restpresencemixin.ts new file mode 100644 index 0000000000..7e600fdee3 --- /dev/null +++ b/src/common/lib/client/restpresencemixin.ts @@ -0,0 +1,52 @@ +import RestPresence from './restpresence'; +import RealtimePresence from './realtimepresence'; +import * as Utils from '../util/utils'; +import { PaginatedResultCallback } from '../../types/utils'; +import Defaults from '../util/defaults'; +import PaginatedResource, { PaginatedResult } from './paginatedresource'; +import PresenceMessage from '../types/presencemessage'; +import { CipherOptions } from '../types/message'; +import { RestChannelMixin } from './restchannelmixin'; + +export class RestPresenceMixin { + static basePath(presence: RestPresence | RealtimePresence) { + return RestChannelMixin.basePath(presence.channel) + '/presence'; + } + + static history( + presence: RestPresence | RealtimePresence, + params: any, + callback: PaginatedResultCallback + ): void | Promise> { + /* params and callback are optional; see if params contains the callback */ + if (callback === undefined) { + if (typeof params == 'function') { + callback = params; + params = null; + } else { + return Utils.promisify(this, 'history', [presence, params]); + } + } + + const client = presence.channel.client, + format = client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json, + envelope = presence.channel.client.http.supportsLinkHeaders ? undefined : format, + headers = Defaults.defaultGetHeaders(client.options, { format }); + + Utils.mixin(headers, client.options.headers); + + const options = presence.channel.channelOptions; + new PaginatedResource(client, this.basePath(presence) + '/history', headers, envelope, async function ( + body, + headers, + unpacked + ) { + return await PresenceMessage.fromResponseBody( + body as Record[], + options as CipherOptions, + client._MsgPack, + unpacked ? undefined : format + ); + }).get(params, callback); + } +} diff --git a/src/common/lib/util/defaults.ts b/src/common/lib/util/defaults.ts index 9939c975c7..489d651307 100644 --- a/src/common/lib/util/defaults.ts +++ b/src/common/lib/util/defaults.ts @@ -6,6 +6,8 @@ import { version } from '../../../../package.json'; import ClientOptions, { InternalClientOptions, NormalisedClientOptions } from 'common/types/ClientOptions'; import IDefaults from '../../types/IDefaults'; import { MsgPack } from 'common/types/msgpack'; +import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; +import { ChannelOptions } from 'common/types/channel'; let agent = 'ably-js/' + version; @@ -265,6 +267,22 @@ export function normaliseOptions(options: InternalClientOptions, MsgPack: MsgPac }; } +export function normaliseChannelOptions(Crypto: IUntypedCryptoStatic | null, options?: ChannelOptions) { + const channelOptions = options || {}; + if (channelOptions.cipher) { + if (!Crypto) Utils.throwMissingModuleError('Crypto'); + const cipher = Crypto.getCipher(channelOptions.cipher); + channelOptions.cipher = cipher.cipherParams; + channelOptions.channelCipher = cipher.cipher; + } else if ('cipher' in channelOptions) { + /* Don't deactivate an existing cipher unless options + * has a 'cipher' key that's falsey */ + channelOptions.cipher = undefined; + channelOptions.channelCipher = null; + } + return channelOptions; +} + const contentTypes = { json: 'application/json', xml: 'application/xml', diff --git a/test/browser/modules.test.js b/test/browser/modules.test.js index bf56cf4fda..f2ba4b77cb 100644 --- a/test/browser/modules.test.js +++ b/test/browser/modules.test.js @@ -31,12 +31,14 @@ describe('browser/modules', function () { let loadTestData; let testMessageEquality; let randomString; + let getTestApp; before((done) => { ablyClientOptions = window.ablyHelpers.ablyClientOptions; testResourcesPath = window.ablyHelpers.testResourcesPath; testMessageEquality = window.ablyHelpers.testMessageEquality; randomString = window.ablyHelpers.randomString; + getTestApp = window.ablyHelpers.getTestApp; loadTestData = async (dataPath) => { return new Promise((resolve, reject) => { @@ -60,27 +62,121 @@ describe('browser/modules', function () { }); describe('Rest', () => { + const restScenarios = [ + { + description: 'use push admin functionality', + action: (client) => client.push.admin.publish({ clientId: 'foo' }, { data: { bar: 'baz' } }), + }, + { description: 'call `time()`', action: (client) => client.time() }, + { description: 'call `request(...)`', action: (client) => client.request('get', '/channels/channel', 2) }, + { + description: 'call `batchPublish(...)`', + action: (client) => client.batchPublish({ channels: ['channel'], messages: { data: { foo: 'bar' } } }), + }, + { + description: 'call `batchPresence(...)`', + action: (client) => client.batchPresence(['channel']), + }, + { + description: 'call `auth.revokeTokens(...)`', + getAdditionalClientOptions: () => { + const testApp = getTestApp(); + return { key: testApp.keys[4].keyStr /* this key has revocableTokens enabled */ }; + }, + action: (client) => client.auth.revokeTokens([{ type: 'clientId', value: 'foo' }]), + }, + { + description: 'call channel’s `history()`', + action: (client) => client.channels.get('channel').history(), + }, + { + description: 'call channel’s `presence.history()`', + additionalRealtimeModules: { RealtimePresence }, + action: (client) => client.channels.get('channel').presence.history(), + }, + { + description: 'call channel’s `status()`', + action: (client) => client.channels.get('channel').status(), + }, + ]; + describe('BaseRest without explicit Rest', () => { - it('offers REST functionality', async () => { - const client = new BaseRest(ablyClientOptions(), { FetchRequest }); - const time = await client.time(); - expect(time).to.be.a('number'); - }); + for (const scenario of restScenarios) { + it(`allows you to ${scenario.description}`, async () => { + const client = new BaseRest(ablyClientOptions(scenario.getAdditionalClientOptions?.()), { FetchRequest }); + + let thrownError = null; + try { + await scenario.action(client); + } catch (error) { + thrownError = error; + } + + expect(thrownError).to.be.null; + }); + } }); describe('BaseRealtime with Rest', () => { - it('offers REST functionality', async () => { - const client = new BaseRealtime(ablyClientOptions(), { WebSocketTransport, FetchRequest, Rest }); - const time = await client.time(); - expect(time).to.be.a('number'); - }); + for (const scenario of restScenarios) { + it(`allows you to ${scenario.description}`, async () => { + const client = new BaseRealtime(ablyClientOptions(scenario.getAdditionalClientOptions?.()), { + WebSocketTransport, + FetchRequest, + Rest, + ...scenario.additionalRealtimeModules, + }); + + let thrownError = null; + try { + await scenario.action(client); + } catch (error) { + thrownError = error; + } + + expect(thrownError).to.be.null; + }); + } }); describe('BaseRealtime without Rest', () => { - it('throws an error when attempting to use REST functionality', async () => { + it('still allows publishing and subscribing', async () => { const client = new BaseRealtime(ablyClientOptions(), { WebSocketTransport, FetchRequest }); - expect(() => client.time()).to.throw('Rest module not provided'); + + const channel = client.channels.get('channel'); + await channel.attach(); + + const recievedMessagePromise = new Promise((resolve) => { + channel.subscribe((message) => { + resolve(message); + }); + }); + + await channel.publish({ data: { foo: 'bar' } }); + + const receivedMessage = await recievedMessagePromise; + expect(receivedMessage.data).to.eql({ foo: 'bar' }); }); + + for (const scenario of restScenarios) { + it(`throws an error when attempting to ${scenario.description}`, async () => { + const client = new BaseRealtime(ablyClientOptions(scenario.getAdditionalClientOptions?.()), { + WebSocketTransport, + FetchRequest, + ...scenario.additionalRealtimeModules, + }); + + let thrownError = null; + try { + await scenario.action(client); + } catch (error) { + thrownError = error; + } + + expect(thrownError).not.to.be.null; + expect(thrownError.message).to.equal('Rest module not provided'); + }); + } }); });