Skip to content

Commit

Permalink
Merge pull request #1496 from ably/1489-remove-remaining-REST-code-fr…
Browse files Browse the repository at this point in the history
…om-BaseRealtime

[SDK-3936] Remove remaining REST code from `BaseRealtime`
  • Loading branch information
lawrence-forooghian authored Nov 21, 2023
2 parents f378984 + b75eb5b commit cdcf043
Show file tree
Hide file tree
Showing 12 changed files with 450 additions and 281 deletions.
63 changes: 3 additions & 60 deletions src/common/lib/client/auth.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import ClientOptions from '../../types/ClientOptions';
import HttpMethods from '../../constants/HttpMethods';
import HttpStatusCodes from 'common/constants/HttpStatusCodes';
import Platform from '../../platform';
import Resource from './resource';
import Defaults from '../util/defaults';

type BatchResult<T> = API.Types.BatchResult<T>;
Expand Down Expand Up @@ -96,7 +95,7 @@ function basicAuthForced(options: ClientOptions) {
}

/* RSA4 */
function useTokenAuth(options: ClientOptions) {
export function useTokenAuth(options: ClientOptions) {
return (
options.useTokenAuth ||
(!basicAuthForced(options) && (options.authCallback || options.authUrl || options.token || options.tokenDetails))
Expand Down Expand Up @@ -1038,64 +1037,8 @@ class Auth {
revokeTokens(
specifiers: TokenRevocationTargetSpecifier[],
options?: TokenRevocationOptions
): Promise<TokenRevocationResult>;
revokeTokens(
specifiers: TokenRevocationTargetSpecifier[],
optionsOrCallbackArg?: TokenRevocationOptions | StandardCallback<TokenRevocationResult>,
callbackArg?: StandardCallback<TokenRevocationResult>
): void | Promise<TokenRevocationResult> {
if (useTokenAuth(this.client.options)) {
throw new ErrorInfo('Cannot revoke tokens when using token auth', 40162, 401);
}

const keyName = this.client.options.keyName!;

let resolvedOptions: TokenRevocationOptions;

if (typeof optionsOrCallbackArg === 'function') {
callbackArg = optionsOrCallbackArg;
resolvedOptions = {};
} else {
resolvedOptions = optionsOrCallbackArg ?? {};
}

if (callbackArg === undefined) {
return Utils.promisify(this, 'revokeTokens', [specifiers, resolvedOptions]);
}

const callback = callbackArg;

const requestBodyDTO = {
targets: specifiers.map((specifier) => `${specifier.type}:${specifier.value}`),
...resolvedOptions,
};

const format = this.client.options.useBinaryProtocol ? Utils.Format.msgpack : Utils.Format.json,
headers = Defaults.defaultPostHeaders(this.client.options, { format });

if (this.client.options.headers) Utils.mixin(headers, this.client.options.headers);

const requestBody = Utils.encodeBody(requestBodyDTO, this.client._MsgPack, format);
Resource.post(
this.client,
`/keys/${keyName}/revokeTokens`,
requestBody,
headers,
{ newBatchResponse: 'true' },
null,
(err, body, headers, unpacked) => {
if (err) {
callback(err);
return;
}

const batchResult = (
unpacked ? body : Utils.decodeBody(body, this.client._MsgPack, format)
) as TokenRevocationResult;

callback(null, batchResult);
}
);
): Promise<TokenRevocationResult> {
return this.client.rest.revokeTokens(specifiers, options);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/common/lib/client/baseclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class BaseClient {
this.__FilteredSubscriptions = modules.MessageInteractions ?? null;
}

private get rest(): Rest {
get rest(): Rest {
if (!this._rest) {
throwMissingModuleError('Rest');
}
Expand Down
96 changes: 0 additions & 96 deletions src/common/lib/client/presence.ts

This file was deleted.

65 changes: 38 additions & 27 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import ProtocolMessage from '../types/protocolmessage';
import EventEmitter from '../util/eventemitter';
import * as Utils from '../util/utils';
import Channel from './channel';
import Logger from '../util/logger';
import RealtimePresence from './realtimepresence';
import Message, { CipherOptions } from '../types/message';
Expand All @@ -14,6 +13,8 @@ import ConnectionManager from '../transport/connectionmanager';
import ConnectionStateChange from './connectionstatechange';
import { ErrCallback, PaginatedResultCallback, StandardCallback } from '../../types/utils';
import BaseRealtime from './baserealtime';
import { ChannelOptions } from '../../types/channel';
import { normaliseChannelOptions } from '../util/defaults';

interface RealtimeHistoryParams {
start?: number;
Expand Down Expand Up @@ -48,14 +49,16 @@ function validateChannelOptions(options?: API.Types.ChannelOptions) {
}
}

class RealtimeChannel extends Channel {
realtime: BaseRealtime;
private _realtimePresence: RealtimePresence | null;
class RealtimeChannel extends EventEmitter {
name: string;
channelOptions: ChannelOptions;
client: BaseRealtime;
private _presence: RealtimePresence | null;
get presence(): RealtimePresence {
if (!this._realtimePresence) {
if (!this._presence) {
Utils.throwMissingModuleError('RealtimePresence');
}
return this._realtimePresence;
return this._presence;
}
connectionManager: ConnectionManager;
state: API.Types.ChannelState;
Expand Down Expand Up @@ -86,12 +89,14 @@ class RealtimeChannel extends Channel {
retryTimer?: number | NodeJS.Timeout | null;
retryCount: number = 0;

constructor(realtime: BaseRealtime, name: string, options?: API.Types.ChannelOptions) {
super(realtime, name, options);
constructor(client: BaseRealtime, name: string, options?: API.Types.ChannelOptions) {
super();
Logger.logAction(Logger.LOG_MINOR, 'RealtimeChannel()', 'started; name = ' + name);
this.realtime = realtime;
this._realtimePresence = realtime._RealtimePresence ? new realtime._RealtimePresence(this) : null;
this.connectionManager = realtime.connection.connectionManager;
this.name = name;
this.channelOptions = normaliseChannelOptions(client._Crypto ?? null, options);
this.client = client;
this._presence = client._RealtimePresence ? new client._RealtimePresence(this) : null;
this.connectionManager = client.connection.connectionManager;
this.state = 'initialized';
this.subscriptions = new EventEmitter();
this.syncChannelSerial = undefined;
Expand All @@ -106,7 +111,7 @@ class RealtimeChannel extends Channel {
this._attachResume = false;
this._decodingContext = {
channelOptions: this.channelOptions,
plugins: realtime.options.plugins || {},
plugins: client.options.plugins || {},
baseEncodedPreviousPayload: undefined,
};
this._lastPayload = {
Expand Down Expand Up @@ -156,7 +161,7 @@ class RealtimeChannel extends Channel {
_callback(err);
return;
}
Channel.prototype.setOptions.call(this, options);
this.channelOptions = normaliseChannelOptions(this.client._Crypto ?? null, options);
if (this._decodingContext) this._decodingContext.channelOptions = this.channelOptions;
if (this._shouldReattachToSetOptions(options)) {
/* This does not just do _attach(true, null, callback) because that would put us
Expand Down Expand Up @@ -236,7 +241,7 @@ class RealtimeChannel extends Channel {
} else {
messages = [Message.fromValues({ name: args[0], data: args[1] })];
}
const maxMessageSize = this.realtime.options.maxMessageSize;
const maxMessageSize = this.client.options.maxMessageSize;
Message.encodeArray(messages, this.channelOptions as CipherOptions, (err: Error | null) => {
if (err) {
callback(err);
Expand All @@ -258,12 +263,11 @@ class RealtimeChannel extends Channel {
);
return;
}
this.__publish(messages, callback);
this._publish(messages, callback);
});
}

// Double underscore used to prevent type conflict with underlying Channel._publish method
__publish(messages: Array<Message>, callback: ErrCallback) {
_publish(messages: Array<Message>, callback: ErrCallback) {
Logger.logAction(Logger.LOG_MICRO, 'RealtimeChannel.publish()', 'message count = ' + messages.length);
const state = this.state;
switch (state) {
Expand Down Expand Up @@ -483,7 +487,7 @@ class RealtimeChannel extends Channel {
}

sendMessage(msg: ProtocolMessage, callback?: ErrCallback): void {
this.connectionManager.send(msg, this.realtime.options.queueMessages, callback);
this.connectionManager.send(msg, this.client.options.queueMessages, callback);
}

sendPresence(presence: PresenceMessage | PresenceMessage[], callback?: ErrCallback): void {
Expand Down Expand Up @@ -523,8 +527,8 @@ class RealtimeChannel extends Channel {
if (this.state === 'attached') {
if (!resumed) {
/* On a loss of continuity, the presence set needs to be re-synced */
if (this._realtimePresence) {
this._realtimePresence.onAttached(hasPresence);
if (this._presence) {
this._presence.onAttached(hasPresence);
}
}
const change = new ChannelStateChange(this.state, this.state, resumed, hasBacklog, message.error);
Expand Down Expand Up @@ -583,8 +587,8 @@ class RealtimeChannel extends Channel {
Logger.logAction(Logger.LOG_ERROR, 'RealtimeChannel.processMessage()', (e as Error).toString());
}
}
if (this._realtimePresence) {
this._realtimePresence.setPresence(presence, isSync, syncChannelSerial as any);
if (this._presence) {
this._presence.setPresence(presence, isSync, syncChannelSerial as any);
}
break;
}
Expand Down Expand Up @@ -721,8 +725,8 @@ class RealtimeChannel extends Channel {
if (state === this.state) {
return;
}
if (this._realtimePresence) {
this._realtimePresence.actOnChannelState(state, hasPresence, reason);
if (this._presence) {
this._presence.actOnChannelState(state, hasPresence, reason);
}
if (state === 'suspended' && this.connectionManager.state.sendEvents) {
this.startRetryTimer();
Expand Down Expand Up @@ -829,7 +833,7 @@ class RealtimeChannel extends Channel {
Logger.logAction(Logger.LOG_MINOR, 'RealtimeChannel.startStateTimerIfNotRunning', 'timer expired');
this.stateTimer = null;
this.timeoutPendingState();
}, this.realtime.options.timeouts.realtimeRequestTimeout);
}, this.client.options.timeouts.realtimeRequestTimeout);
}
}

Expand All @@ -845,7 +849,7 @@ class RealtimeChannel extends Channel {
if (this.retryTimer) return;

this.retryCount++;
const retryDelay = Utils.getRetryTime(this.realtime.options.timeouts.channelRetryTimeout, this.retryCount);
const retryDelay = Utils.getRetryTime(this.client.options.timeouts.channelRetryTimeout, this.retryCount);

this.retryTimer = setTimeout(() => {
/* If connection is not connected, just leave in suspended, a reattach
Expand Down Expand Up @@ -881,6 +885,9 @@ class RealtimeChannel extends Channel {
}
}

// We fetch this first so that any module-not-provided error takes priority over other errors
const restMixin = this.client.rest.channelMixin;

if (params && params.untilAttach) {
if (this.state !== 'attached') {
callback(new ErrorInfo('option untilAttach requires the channel to be attached', 40000, 400));
Expand All @@ -900,7 +907,7 @@ class RealtimeChannel extends Channel {
params.from_serial = this.properties.attachSerial;
}

Channel.prototype._history.call(this, params, callback);
return restMixin.history(this, params, callback);
} as any;

whenState = ((state: string, listener: ErrCallback) => {
Expand Down Expand Up @@ -934,6 +941,10 @@ class RealtimeChannel extends Channel {
this.properties.channelSerial = channelSerial;
}
}

status(callback?: StandardCallback<API.Types.ChannelDetails>): void | Promise<API.Types.ChannelDetails> {
return this.client.rest.channelMixin.status(this, callback);
}
}

export default RealtimeChannel;
Loading

0 comments on commit cdcf043

Please sign in to comment.