diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 1b75fc06e..427b6398e 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -30,7 +30,7 @@ import { ChannelOptions } from '../../types/channel'; import { normaliseChannelOptions } from '../util/defaults'; import { PaginatedResult } from './paginatedresource'; import type { PushChannel } from 'plugins/push'; -import type { LiveObjects } from 'plugins/liveobjects'; +import type { LiveObjects, StateMessage } from 'plugins/liveobjects'; interface RealtimeHistoryParams { start?: number; @@ -604,69 +604,27 @@ class RealtimeChannel extends EventEmitter { break; } - case actions.STATE: { - if (!this._liveObjects) { - return; - } - - const { id, connectionId, timestamp } = message; - const options = this.channelOptions; - - const stateMessages = message.state ?? []; - for (let i = 0; i < stateMessages.length; i++) { - try { - const stateMessage = stateMessages[i]; - - await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData); - - if (!stateMessage.connectionId) stateMessage.connectionId = connectionId; - if (!stateMessage.timestamp) stateMessage.timestamp = timestamp; - if (!stateMessage.id) stateMessage.id = id + ':' + i; - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - } - } - - this._liveObjects.handleStateMessages(stateMessages); - - break; - } - + // STATE and STATE_SYNC message processing share most of the logic, so group them together + case actions.STATE: case actions.STATE_SYNC: { if (!this._liveObjects) { return; } - const { id, connectionId, timestamp } = message; + const stateMessages = message.state ?? []; const options = this.channelOptions; + await this._decodeAndPrepareMessages(message, stateMessages, (msg) => + this.client._LiveObjectsPlugin + ? this.client._LiveObjectsPlugin.StateMessage.decode(msg, options, decodeData) + : Utils.throwMissingPluginError('LiveObjects'), + ); - const stateMessages = message.state ?? []; - for (let i = 0; i < stateMessages.length; i++) { - try { - const stateMessage = stateMessages[i]; - - await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData); - - if (!stateMessage.connectionId) stateMessage.connectionId = connectionId; - if (!stateMessage.timestamp) stateMessage.timestamp = timestamp; - if (!stateMessage.id) stateMessage.id = id + ':' + i; - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - } + if (message.action === actions.STATE) { + this._liveObjects.handleStateMessages(stateMessages); + } else { + this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial); } - this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial); - break; } @@ -774,7 +732,7 @@ class RealtimeChannel extends EventEmitter { * @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding * and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided */ - private async _decodeAndPrepareMessages( + private async _decodeAndPrepareMessages( protocolMessage: ProtocolMessage, messages: T[], decodeFn: (msg: T) => Promise,