Skip to content

Commit

Permalink
Merge pull request #1931 from ably/liveobjects/refactor-state-message…
Browse files Browse the repository at this point in the history
…-processing

Use common `_decodeAndPrepareMessages` for processing of `STATE` and `STATE_SYNC` messages
  • Loading branch information
VeskeR authored Dec 4, 2024
2 parents 7da28eb + 6876339 commit f08222d
Showing 1 changed file with 14 additions and 56 deletions.
70 changes: 14 additions & 56 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import { ChannelOptions } from '../../types/channel';
import { normaliseChannelOptions } from '../util/defaults';
import { PaginatedResult } from './paginatedresource';
import type { PushChannel } from 'plugins/push';
import type { LiveObjects } from 'plugins/liveobjects';
import type { LiveObjects, StateMessage } from 'plugins/liveobjects';

interface RealtimeHistoryParams {
start?: number;
Expand Down Expand Up @@ -604,69 +604,27 @@ class RealtimeChannel extends EventEmitter {
break;
}

case actions.STATE: {
if (!this._liveObjects) {
return;
}

const { id, connectionId, timestamp } = message;
const options = this.channelOptions;

const stateMessages = message.state ?? [];
for (let i = 0; i < stateMessages.length; i++) {
try {
const stateMessage = stateMessages[i];

await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData);

if (!stateMessage.connectionId) stateMessage.connectionId = connectionId;
if (!stateMessage.timestamp) stateMessage.timestamp = timestamp;
if (!stateMessage.id) stateMessage.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
}

this._liveObjects.handleStateMessages(stateMessages);

break;
}

// STATE and STATE_SYNC message processing share most of the logic, so group them together
case actions.STATE:
case actions.STATE_SYNC: {
if (!this._liveObjects) {
return;
}

const { id, connectionId, timestamp } = message;
const stateMessages = message.state ?? [];
const options = this.channelOptions;
await this._decodeAndPrepareMessages(message, stateMessages, (msg) =>
this.client._LiveObjectsPlugin
? this.client._LiveObjectsPlugin.StateMessage.decode(msg, options, decodeData)
: Utils.throwMissingPluginError('LiveObjects'),
);

const stateMessages = message.state ?? [];
for (let i = 0; i < stateMessages.length; i++) {
try {
const stateMessage = stateMessages[i];

await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData);

if (!stateMessage.connectionId) stateMessage.connectionId = connectionId;
if (!stateMessage.timestamp) stateMessage.timestamp = timestamp;
if (!stateMessage.id) stateMessage.id = id + ':' + i;
} catch (e) {
Logger.logAction(
this.logger,
Logger.LOG_ERROR,
'RealtimeChannel.processMessage()',
(e as Error).toString(),
);
}
if (message.action === actions.STATE) {
this._liveObjects.handleStateMessages(stateMessages);
} else {
this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial);
}

this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial);

break;
}

Expand Down Expand Up @@ -774,7 +732,7 @@ class RealtimeChannel extends EventEmitter {
* @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding
* and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided
*/
private async _decodeAndPrepareMessages<T extends Message | PresenceMessage>(
private async _decodeAndPrepareMessages<T extends Message | PresenceMessage | StateMessage>(
protocolMessage: ProtocolMessage,
messages: T[],
decodeFn: (msg: T) => Promise<void>,
Expand Down

0 comments on commit f08222d

Please sign in to comment.