diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 4cdcd2312..b7ea4e577 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -649,7 +649,7 @@ class RealtimeChannel extends EventEmitter { } } - this._liveObjects.handleStateMessages(stateMessages); + this._liveObjects.handleStateMessages(stateMessages, message.channelSerial); break; } diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index a8c3db683..61d17d9a2 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -1,6 +1,7 @@ import { LiveObject, LiveObjectData } from './liveobject'; import { LiveObjects } from './liveobjects'; -import { StateCounter, StateCounterOp, StateOperation, StateOperationAction } from './statemessage'; +import { StateCounter, StateCounterOp, StateMessage, StateOperation, StateOperationAction } from './statemessage'; +import { Timeserial } from './timeserial'; export interface LiveCounterData extends LiveObjectData { data: number; @@ -12,8 +13,9 @@ export class LiveCounter extends LiveObject { private _created: boolean, initialData?: LiveCounterData | null, objectId?: string, + regionalTimeserial?: Timeserial, ) { - super(liveObjects, initialData, objectId); + super(liveObjects, initialData, objectId, regionalTimeserial); } /** @@ -21,8 +23,13 @@ export class LiveCounter extends LiveObject { * * @internal */ - static zeroValue(liveobjects: LiveObjects, isCreated: boolean, objectId?: string): LiveCounter { - return new LiveCounter(liveobjects, isCreated, null, objectId); + static zeroValue( + liveobjects: LiveObjects, + isCreated: boolean, + objectId?: string, + regionalTimeserial?: Timeserial, + ): LiveCounter { + return new LiveCounter(liveobjects, isCreated, null, objectId, regionalTimeserial); } value(): number { @@ -46,7 +53,7 @@ export class LiveCounter extends LiveObject { /** * @internal */ - applyOperation(op: StateOperation): void { + applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void { if (op.objectId !== this.getObjectId()) { throw new this._client.ErrorInfo( `Cannot apply state operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`, @@ -75,6 +82,8 @@ export class LiveCounter extends LiveObject { 500, ); } + + this.setRegionalTimeserial(opRegionalTimeserial); } protected _getZeroValueData(): LiveCounterData { diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index f447795aa..42d8248e7 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -46,8 +46,9 @@ export class LiveMap extends LiveObject { private _semantics: MapSemantics, initialData?: LiveMapData | null, objectId?: string, + regionalTimeserial?: Timeserial, ) { - super(liveObjects, initialData, objectId); + super(liveObjects, initialData, objectId, regionalTimeserial); } /** @@ -55,8 +56,8 @@ export class LiveMap extends LiveObject { * * @internal */ - static zeroValue(liveobjects: LiveObjects, objectId?: string): LiveMap { - return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId); + static zeroValue(liveobjects: LiveObjects, objectId?: string, regionalTimeserial?: Timeserial): LiveMap { + return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId, regionalTimeserial); } static liveMapDataFromMapEntries(client: BaseClient, entries: Record): LiveMapData { @@ -134,7 +135,7 @@ export class LiveMap extends LiveObject { /** * @internal */ - applyOperation(op: StateOperation, msg: StateMessage): void { + applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void { if (op.objectId !== this.getObjectId()) { throw new this._client.ErrorInfo( `Cannot apply state operation with objectId=${op.objectId}, to this LiveMap with objectId=${this.getObjectId()}`, @@ -171,6 +172,8 @@ export class LiveMap extends LiveObject { 500, ); } + + this.setRegionalTimeserial(opRegionalTimeserial); } protected _getZeroValueData(): LiveMapData { diff --git a/src/plugins/liveobjects/liveobject.ts b/src/plugins/liveobjects/liveobject.ts index 2e33cb0b2..70a294779 100644 --- a/src/plugins/liveobjects/liveobject.ts +++ b/src/plugins/liveobjects/liveobject.ts @@ -1,6 +1,7 @@ import type BaseClient from 'common/lib/client/baseclient'; import { LiveObjects } from './liveobjects'; import { StateMessage, StateOperation } from './statemessage'; +import { DefaultTimeserial, Timeserial } from './timeserial'; export interface LiveObjectData { data: any; @@ -10,16 +11,19 @@ export abstract class LiveObject { protected _client: BaseClient; protected _dataRef: T; protected _objectId: string; - protected _regionalTimeserial?: string; + protected _regionalTimeserial: Timeserial; constructor( protected _liveObjects: LiveObjects, initialData?: T | null, objectId?: string, + regionalTimeserial?: Timeserial, ) { this._client = this._liveObjects.getClient(); this._dataRef = initialData ?? this._getZeroValueData(); this._objectId = objectId ?? this._createObjectId(); + // use zero value timeserial by default, so any future operation can be applied for this object + this._regionalTimeserial = regionalTimeserial ?? DefaultTimeserial.zeroValueTimeserial(this._client); } /** @@ -32,7 +36,7 @@ export abstract class LiveObject { /** * @internal */ - getRegionalTimeserial(): string | undefined { + getRegionalTimeserial(): Timeserial { return this._regionalTimeserial; } @@ -46,7 +50,7 @@ export abstract class LiveObject { /** * @internal */ - setRegionalTimeserial(regionalTimeserial: string): void { + setRegionalTimeserial(regionalTimeserial: Timeserial): void { this._regionalTimeserial = regionalTimeserial; } @@ -58,6 +62,6 @@ export abstract class LiveObject { /** * @internal */ - abstract applyOperation(op: StateOperation, msg: StateMessage): void; + abstract applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void; protected abstract _getZeroValueData(): T; } diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index 0fb0886ed..9b1981f63 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -8,11 +8,17 @@ import { LiveObject } from './liveobject'; import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool'; import { StateMessage } from './statemessage'; import { LiveCounterDataEntry, SyncLiveObjectsDataPool } from './syncliveobjectsdatapool'; +import { DefaultTimeserial, Timeserial } from './timeserial'; enum LiveObjectsEvents { SyncCompleted = 'SyncCompleted', } +export interface BufferedStateMessage { + stateMessage: StateMessage; + regionalTimeserial: Timeserial; +} + export class LiveObjects { private _client: BaseClient; private _channel: RealtimeChannel; @@ -24,6 +30,7 @@ export class LiveObjects { private _syncInProgress: boolean; private _currentSyncId: string | undefined; private _currentSyncCursor: string | undefined; + private _bufferedStateOperations: BufferedStateMessage[]; constructor(channel: RealtimeChannel) { this._channel = channel; @@ -32,6 +39,7 @@ export class LiveObjects { this._liveObjectsPool = new LiveObjectsPool(this); this._syncLiveObjectsDataPool = new SyncLiveObjectsDataPool(this); this._syncInProgress = true; + this._bufferedStateOperations = []; } async getRoot(): Promise { @@ -84,12 +92,23 @@ export class LiveObjects { /** * @internal */ - handleStateMessages(stateMessages: StateMessage[]): void { + handleStateMessages(stateMessages: StateMessage[], msgRegionalTimeserial: string | null | undefined): void { + const timeserial = DefaultTimeserial.calculateTimeserial(this._client, msgRegionalTimeserial); + if (this._syncInProgress) { - // TODO: handle buffering of state messages during SYNC + // The client receives state messages in realtime over the channel concurrently with the SYNC sequence. + // Some of the incoming state messages may have already been applied to the state objects described in + // the SYNC sequence, but others may not; therefore we must buffer these messages so that we can apply + // them to the state objects once the SYNC is complete. To avoid double-counting, the buffered operations + // are applied according to the state object's regional timeserial, which reflects the regional timeserial + // of the state message that was last applied to that state object. + stateMessages.forEach((x) => + this._bufferedStateOperations.push({ stateMessage: x, regionalTimeserial: timeserial }), + ); + return; } - this._liveObjectsPool.applyStateMessages(stateMessages); + this._liveObjectsPool.applyStateMessages(stateMessages, timeserial); } /** @@ -100,7 +119,7 @@ export class LiveObjects { this._client.logger, this._client.Logger.LOG_MINOR, 'LiveObjects.onAttached()', - 'channel = ' + this._channel.name + ', hasState = ' + hasState, + `channel=${this._channel.name}, hasState=${hasState}`, ); if (hasState) { @@ -135,6 +154,8 @@ export class LiveObjects { } private _startNewSync(syncId?: string, syncCursor?: string): void { + // need to discard all buffered state operation messages on new sync start + this._bufferedStateOperations = []; this._syncLiveObjectsDataPool.reset(); this._currentSyncId = syncId; this._currentSyncCursor = syncCursor; @@ -142,9 +163,11 @@ export class LiveObjects { } private _endSync(): void { - // TODO: handle applying buffered state messages when SYNC is finished - this._applySync(); + // should apply buffered state operations after we applied the SYNC data + this._liveObjectsPool.applyBufferedStateMessages(this._bufferedStateOperations); + + this._bufferedStateOperations = []; this._syncLiveObjectsDataPool.reset(); this._currentSyncId = undefined; this._currentSyncCursor = undefined; @@ -180,10 +203,11 @@ export class LiveObjects { for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) { receivedObjectIds.add(objectId); const existingObject = this._liveObjectsPool.get(objectId); + const regionalTimeserialObj = DefaultTimeserial.calculateTimeserial(this._client, entry.regionalTimeserial); if (existingObject) { existingObject.setData(entry.objectData); - existingObject.setRegionalTimeserial(entry.regionalTimeserial); + existingObject.setRegionalTimeserial(regionalTimeserialObj); if (existingObject instanceof LiveCounter) { existingObject.setCreated((entry as LiveCounterDataEntry).created); } @@ -195,17 +219,16 @@ export class LiveObjects { const objectType = entry.objectType; switch (objectType) { case 'LiveCounter': - newObject = new LiveCounter(this, entry.created, entry.objectData, objectId); + newObject = new LiveCounter(this, entry.created, entry.objectData, objectId, regionalTimeserialObj); break; case 'LiveMap': - newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId); + newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId, regionalTimeserialObj); break; default: throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 50000, 500); } - newObject.setRegionalTimeserial(entry.regionalTimeserial); this._liveObjectsPool.set(objectId, newObject); } diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index 6db02c78e..3d41e2e23 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -3,9 +3,10 @@ import type RealtimeChannel from 'common/lib/client/realtimechannel'; import { LiveCounter } from './livecounter'; import { LiveMap } from './livemap'; import { LiveObject } from './liveobject'; -import { LiveObjects } from './liveobjects'; +import { BufferedStateMessage, LiveObjects } from './liveobjects'; import { ObjectId } from './objectid'; import { MapSemantics, StateMessage, StateOperation, StateOperationAction } from './statemessage'; +import { DefaultTimeserial, Timeserial } from './timeserial'; export const ROOT_OBJECT_ID = 'root'; @@ -51,22 +52,24 @@ export class LiveObjectsPool { } const parsedObjectId = ObjectId.fromString(this._client, objectId); + // use zero value timeserial, so any operation can be applied for this object + const regionalTimeserial = DefaultTimeserial.zeroValueTimeserial(this._client); let zeroValueObject: LiveObject; switch (parsedObjectId.type) { case 'map': { - zeroValueObject = LiveMap.zeroValue(this._liveObjects, objectId); + zeroValueObject = LiveMap.zeroValue(this._liveObjects, objectId, regionalTimeserial); break; } case 'counter': - zeroValueObject = LiveCounter.zeroValue(this._liveObjects, false, objectId); + zeroValueObject = LiveCounter.zeroValue(this._liveObjects, false, objectId, regionalTimeserial); break; } this.set(objectId, zeroValueObject); } - applyStateMessages(stateMessages: StateMessage[]): void { + applyStateMessages(stateMessages: StateMessage[], regionalTimeserial: Timeserial): void { for (const stateMessage of stateMessages) { if (!stateMessage.operation) { this._client.Logger.logAction( @@ -87,17 +90,17 @@ export class LiveObjectsPool { // object wich such id already exists (we may have created a zero-value object before, or this is a duplicate *_CREATE op), // so delegate application of the op to that object // TODO: invoke subscription callbacks for an object when applied - this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); + this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage, regionalTimeserial); break; } // otherwise we can create new objects in the pool if (stateOperation.action === StateOperationAction.MAP_CREATE) { - this._handleMapCreate(stateOperation); + this._handleMapCreate(stateOperation, regionalTimeserial); } if (stateOperation.action === StateOperationAction.COUNTER_CREATE) { - this._handleCounterCreate(stateOperation); + this._handleCounterCreate(stateOperation, regionalTimeserial); } break; @@ -109,7 +112,7 @@ export class LiveObjectsPool { // when we eventually receive a corresponding *_CREATE op for that object, its application will be handled by that zero-value object. this.createZeroValueObjectIfNotExists(stateOperation.objectId); // TODO: invoke subscription callbacks for an object when applied - this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage); + this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage, regionalTimeserial); break; default: @@ -123,6 +126,46 @@ export class LiveObjectsPool { } } + applyBufferedStateMessages(bufferedStateMessages: BufferedStateMessage[]): void { + // since we receive state operation messages concurrently with the SYNC sequence, + // we must determine which operation messages should be applied to the now local copy of the object pool, and the rest will be skipped. + // since messages are delivered in regional order to the client, we can inspect the regional timeserial + // of each state operation message to know whether it has reached a point in the message stream + // that is no longer included in the state object snapshot we received from SYNC sequence. + for (const { regionalTimeserial, stateMessage } of bufferedStateMessages) { + if (!stateMessage.operation) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MAJOR, + 'LiveObjects.LiveObjectsPool.applyBufferedStateMessages()', + `state operation message is received without 'operation' field, skipping message; message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + continue; + } + + const existingObject = this.get(stateMessage.operation.objectId); + if (!existingObject) { + // for object ids we haven't seen yet we can apply operation immediately + this.applyStateMessages([stateMessage], regionalTimeserial); + continue; + } + + // otherwise we need to compare regional timeserials + if (regionalTimeserial.before(existingObject.getRegionalTimeserial())) { + // the operation's regional timeserial < the object's timeserial, ignore the operation. + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_MICRO, + 'LiveObjects.LiveObjectsPool.applyBufferedStateMessages()', + `skipping applying buffered state operation message as existing object has greater regional timeserial: ${existingObject.getRegionalTimeserial().toString()}, than the op: ${regionalTimeserial.toString()}; objectId=${stateMessage.operation.objectId}, message id: ${stateMessage.id}, channel: ${this._channel.name}`, + ); + continue; + } + + this.applyStateMessages([stateMessage], regionalTimeserial); + } + } + private _getInitialPool(): Map { const pool = new Map(); const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID); @@ -130,28 +173,29 @@ export class LiveObjectsPool { return pool; } - private _handleCounterCreate(stateOperation: StateOperation): void { + private _handleCounterCreate(stateOperation: StateOperation, opRegionalTimeserial: Timeserial): void { let counter: LiveCounter; if (this._client.Utils.isNil(stateOperation.counter)) { // if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly a zero-value counter. - counter = LiveCounter.zeroValue(this._liveObjects, true, stateOperation.objectId); + counter = LiveCounter.zeroValue(this._liveObjects, true, stateOperation.objectId, opRegionalTimeserial); } else { counter = new LiveCounter( this._liveObjects, true, { data: stateOperation.counter.count ?? 0 }, stateOperation.objectId, + opRegionalTimeserial, ); } this.set(stateOperation.objectId, counter); } - private _handleMapCreate(stateOperation: StateOperation): void { + private _handleMapCreate(stateOperation: StateOperation, opRegionalTimeserial: Timeserial): void { let map: LiveMap; if (this._client.Utils.isNil(stateOperation.map)) { // if a map object is missing for the MAP_CREATE op, the initial value is implicitly a zero-value map. - map = LiveMap.zeroValue(this._liveObjects, stateOperation.objectId); + map = LiveMap.zeroValue(this._liveObjects, stateOperation.objectId, opRegionalTimeserial); } else { const objectData = LiveMap.liveMapDataFromMapEntries(this._client, stateOperation.map.entries ?? {}); map = new LiveMap( @@ -159,6 +203,7 @@ export class LiveObjectsPool { stateOperation.map.semantics ?? MapSemantics.LWW, objectData, stateOperation.objectId, + opRegionalTimeserial, ); } diff --git a/test/common/modules/live_objects_helper.js b/test/common/modules/live_objects_helper.js index b2793804e..82c65b737 100644 --- a/test/common/modules/live_objects_helper.js +++ b/test/common/modules/live_objects_helper.js @@ -3,7 +3,9 @@ /** * LiveObjects helper to create pre-determined state tree on channels */ -define(['shared_helper'], function (Helper) { +define(['ably', 'shared_helper', 'live_objects'], function (Ably, Helper, LiveObjectsPlugin) { + const createPM = Ably.makeProtocolMessageFromDeserialized({ LiveObjectsPlugin }); + const ACTIONS = { MAP_CREATE: 0, MAP_SET: 1, @@ -18,6 +20,7 @@ define(['shared_helper'], function (Helper) { class LiveObjectsHelper { constructor(helper) { + this._helper = helper; this._rest = helper.AblyRest({ useBinaryProtocol: false }); } @@ -171,6 +174,97 @@ define(['shared_helper'], function (Helper) { return op; } + mapObject(opts) { + const { objectId, regionalTimeserial, entries } = opts; + const obj = { + object: { + objectId, + regionalTimeserial, + map: { entries }, + }, + }; + + return obj; + } + + counterObject(opts) { + const { objectId, regionalTimeserial, count } = opts; + const obj = { + object: { + objectId, + regionalTimeserial, + counter: { + created: true, + count, + }, + }, + }; + + return obj; + } + + stateOperationMessage(opts) { + const { channelName, serial, state } = opts; + + state?.forEach((x, i) => (x.serial = `${serial}:${i}`)); + + return { + action: 19, // STATE + channel: channelName, + channelSerial: serial, + state: state ?? [], + }; + } + + stateObjectMessage(opts) { + const { channelName, syncSerial, state } = opts; + + return { + action: 20, // STATE_SYNC + channel: channelName, + channelSerial: syncSerial, + state: state ?? [], + }; + } + + async processStateOperationMessageOnChannel(opts) { + const { channel, ...rest } = opts; + + this._helper.recordPrivateApi('call.channel.processMessage'); + this._helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); + await channel.processMessage( + createPM( + this.stateOperationMessage({ + ...rest, + channelName: channel.name, + }), + ), + ); + } + + async processStateObjectMessageOnChannel(opts) { + const { channel, ...rest } = opts; + + this._helper.recordPrivateApi('call.channel.processMessage'); + this._helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); + await channel.processMessage( + createPM( + this.stateObjectMessage({ + ...rest, + channelName: channel.name, + }), + ), + ); + } + + fakeMapObjectId() { + return `map:${Helper.randomString()}`; + } + + fakeCounterObjectId() { + return `counter:${Helper.randomString()}`; + } + async stateRequest(channelName, opBody) { if (Array.isArray(opBody)) { throw new Error(`Only single object state requests are supported`); diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index 885760acf..4d472d1a4 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -61,6 +61,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], /** @nospec */ it(`doesn't break when it receives a STATE ProtocolMessage`, async function () { const helper = this.test.helper; + const liveObjectsHelper = new LiveObjectsHelper(helper); const testClient = helper.AblyRealtime(); await helper.monitorConnectionThenCloseAndFinish(async () => { @@ -73,25 +74,13 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await helper.monitorConnectionThenCloseAndFinish(async () => { // inject STATE message that should be ignored and not break anything without LiveObjects plugin - helper.recordPrivateApi('call.channel.processMessage'); - helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); - await testChannel.processMessage( - createPM({ - action: 19, - channel: 'channel', - channelSerial: 'serial:', - state: [ - { - operation: { - action: 1, - objectId: 'root', - mapOp: { key: 'stringKey', data: { value: 'stringValue' } }, - }, - serial: 'a@0-0', - }, - ], - }), - ); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel: testChannel, + serial: '@0-0', + state: [ + liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'stringKey', data: { value: 'stringValue' } }), + ], + }); const publishChannel = publishClient.channels.get('channel'); await publishChannel.publish(null, 'test'); @@ -105,6 +94,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], /** @nospec */ it(`doesn't break when it receives a STATE_SYNC ProtocolMessage`, async function () { const helper = this.test.helper; + const liveObjectsHelper = new LiveObjectsHelper(helper); const testClient = helper.AblyRealtime(); await helper.monitorConnectionThenCloseAndFinish(async () => { @@ -117,24 +107,11 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await helper.monitorConnectionThenCloseAndFinish(async () => { // inject STATE_SYNC message that should be ignored and not break anything without LiveObjects plugin - helper.recordPrivateApi('call.channel.processMessage'); - helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); - await testChannel.processMessage( - createPM({ - action: 20, - channel: 'channel', - channelSerial: 'serial:', - state: [ - { - object: { - objectId: 'root', - regionalTimeserial: 'a@0-0', - map: {}, - }, - }, - ], - }), - ); + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel: testChannel, + syncSerial: 'serial:', + state: [liveObjectsHelper.mapObject({ objectId: 'root', regionalTimeserial: '@0-0' })], + }); const publishChannel = publishClient.channels.get('channel'); await publishChannel.publish(null, 'test'); @@ -259,8 +236,9 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }); /** @nospec */ - it('getRoot() waits for subsequent STATE_SYNC to finish before resolving', async function () { + it('getRoot() waits for STATE_SYNC with empty cursor before resolving', async function () { const helper = this.test.helper; + const liveObjectsHelper = new LiveObjectsHelper(helper); const client = RealtimeWithLiveObjects(helper); await helper.monitorConnectionThenCloseAndFinish(async () => { @@ -272,23 +250,17 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], await liveObjects.getRoot(); // inject STATE_SYNC message to emulate start of a new sequence - helper.recordPrivateApi('call.channel.processMessage'); - helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); - await channel.processMessage( - createPM({ - action: 20, - channel: 'channel', - // have cursor so client awaits for additional STATE_SYNC messages - channelSerial: 'serial:cursor', - state: [], - }), - ); + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + // have cursor so client awaits for additional STATE_SYNC messages + syncSerial: 'serial:cursor', + }); let getRootResolved = false; - let newRoot; + let root; liveObjects.getRoot().then((value) => { getRootResolved = true; - newRoot = value; + root = value; }); // wait for next tick to check that getRoot() promise handler didn't proc @@ -297,42 +269,26 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], expect(getRootResolved, 'Check getRoot() is not resolved while STATE_SYNC is in progress').to.be.false; - // inject next STATE_SYNC message - helper.recordPrivateApi('call.channel.processMessage'); - helper.recordPrivateApi('call.makeProtocolMessageFromDeserialized'); - await channel.processMessage( - createPM({ - action: 20, - channel: 'channel', - // no cursor to indicate the end of STATE_SYNC messages - channelSerial: 'serial:', - state: [ - { - object: { - objectId: 'root', - regionalTimeserial: 'a@0-0', - map: { - entries: { - key: { - timeserial: 'a@0-0', - data: { - value: 1, - }, - }, - }, - }, - }, - }, - ], - }), - ); + // inject final STATE_SYNC message + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + // no cursor to indicate the end of STATE_SYNC messages + syncSerial: 'serial:', + state: [ + liveObjectsHelper.mapObject({ + objectId: 'root', + regionalTimeserial: '@0-0', + entries: { key: { timeserial: '@0-0', data: { value: 1 } } }, + }), + ], + }); // wait for next tick for getRoot() handler to process helper.recordPrivateApi('call.Platform.nextTick'); await new Promise((res) => nextTick(res)); expect(getRootResolved, 'Check getRoot() is resolved when STATE_SYNC sequence has ended').to.be.true; - expect(newRoot.get('key')).to.equal(1, 'Check new root after STATE_SYNC sequence has expected key'); + expect(root.get('key')).to.equal(1, 'Check new root after STATE_SYNC sequence has expected key'); }, client); }); @@ -503,6 +459,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], data: { value: 'eyJwcm9kdWN0SWQiOiAiMDAxIiwgInByb2R1Y3ROYW1lIjogImNhciJ9', encoding: 'base64' }, }, { key: 'emptyBytesKey', data: { value: '', encoding: 'base64' } }, + { key: 'maxSafeIntegerKey', data: { value: Number.MAX_SAFE_INTEGER } }, + { key: 'negativeMaxSafeIntegerKey', data: { value: -Number.MAX_SAFE_INTEGER } }, { key: 'numberKey', data: { value: 1 } }, { key: 'zeroKey', data: { value: 0 } }, { key: 'trueKey', data: { value: true } }, @@ -946,6 +904,329 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, client); }); } + + const operationsDuringSyncSequence = [ + { + description: 'state operation messages are buffered during STATE_SYNC sequence', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:cursor', + }); + + // inject operations, it should not be applied as sync is in progress + await Promise.all( + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], + }), + ), + ); + + // check root doesn't have data from operations + primitiveKeyData.forEach((keyData) => { + expect(root.get(keyData.key), `Check "${keyData.key}" key doesn't exist on root during STATE_SYNC`).to.not + .exist; + }); + }, + }, + + { + description: 'buffered state operation messages are applied when STATE_SYNC sequence ends', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:cursor', + }); + + // inject operations, they should be applied when sync ends + await Promise.all( + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], + }), + ), + ); + + // end the sync with empty cursor + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', + }); + + // check everything is applied correctly + primitiveKeyData.forEach((keyData) => { + if (keyData.data.encoding) { + expect( + BufferUtils.areBuffersEqual(root.get(keyData.key), BufferUtils.base64Decode(keyData.data.value)), + `Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`, + ).to.be.true; + } else { + expect(root.get(keyData.key)).to.equal( + keyData.data.value, + `Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`, + ); + } + }); + }, + }, + + { + description: 'buffered state operation messages are discarded when new STATE_SYNC sequence starts', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:cursor', + }); + + // inject operations, expect them to be discarded when sync with new sequence id starts + await Promise.all( + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], + }), + ), + ); + + // start new sync with new sequence id + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'otherserial:cursor', + }); + + // inject another operation that should be applied when latest sync ends + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'foo', data: { value: 'bar' } })], + }); + + // end sync + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'otherserial:', + }); + + // check root doesn't have data from operations received during first sync + primitiveKeyData.forEach((keyData) => { + expect( + root.get(keyData.key), + `Check "${keyData.key}" key doesn't exist on root when STATE_SYNC has ended`, + ).to.not.exist; + }); + + // check root has data from operations received during second sync + expect(root.get('foo')).to.equal( + 'bar', + 'Check root has data from operations received during second STATE_SYNC sequence', + ); + }, + }, + + { + description: 'buffered state operation messages are applied based on regional timeserial of the object', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages + const mapId = liveObjectsHelper.fakeMapObjectId(); + const counterId = liveObjectsHelper.fakeCounterObjectId(); + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:cursor', + // add state object messages with non-zero regional timeserials + state: [ + liveObjectsHelper.mapObject({ + objectId: 'root', + regionalTimeserial: '@1-0', + entries: { + map: { timeserial: '@0-0', data: { objectId: mapId } }, + counter: { timeserial: '@0-0', data: { objectId: counterId } }, + }, + }), + liveObjectsHelper.mapObject({ + objectId: mapId, + regionalTimeserial: '@1-0', + }), + liveObjectsHelper.counterObject({ + objectId: counterId, + regionalTimeserial: '@1-0', + }), + ], + }); + + // inject operations with older regional timeserial, expect them not to be applied when sync ends + await Promise.all( + ['root', mapId].flatMap((objectId) => + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId, key: keyData.key, data: keyData.data })], + }), + ), + ), + ); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })], + }); + + // inject operations with greater regional timeserial, expect them to be applied when sync ends + await Promise.all( + ['root', mapId].map((objectId) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@2-0', + state: [liveObjectsHelper.mapSetOp({ objectId, key: 'foo', data: { value: 'bar' } })], + }), + ), + ); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@2-0', + state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })], + }); + + // end sync + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', + }); + + // check operations with older regional timeserial are not applied + // counter will be checked to match an expected value explicitly, so no need to check that it doesn't equal a sum of operations + primitiveKeyData.forEach((keyData) => { + expect( + root.get(keyData.key), + `Check "${keyData.key}" key doesn't exist on root when STATE_SYNC has ended`, + ).to.not.exist; + }); + primitiveKeyData.forEach((keyData) => { + expect( + root.get('map').get(keyData.key), + `Check "${keyData.key}" key doesn't exist on inner map when STATE_SYNC has ended`, + ).to.not.exist; + }); + + // check operations with greater regional timeserial are applied + expect(root.get('foo')).to.equal( + 'bar', + 'Check only data from operations with greater regional timeserial exists on root after STATE_SYNC', + ); + expect(root.get('map').get('foo')).to.equal( + 'bar', + 'Check only data from operations with greater regional timeserial exists on inner map after STATE_SYNC', + ); + expect(root.get('counter').value()).to.equal( + 1, + 'Check only increment operations with greater regional timeserial were applied to counter after STATE_SYNC', + ); + }, + }, + + { + description: + 'subsequent state operation messages are applied immediately after STATE_SYNC ended and buffers are applied', + action: async (ctx) => { + const { root, liveObjectsHelper, channel, channelName } = ctx; + + // start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:cursor', + }); + + // inject operations, they should be applied when sync ends + await Promise.all( + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], + }), + ), + ); + + // end the sync with empty cursor + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', + }); + + // send some more operations + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ + objectId: 'root', + key: 'foo', + data: { value: 'bar' }, + }), + ); + + // check buffered operations are applied, as well as the most recent operation outside of the STATE_SYNC is applied + primitiveKeyData.forEach((keyData) => { + if (keyData.data.encoding) { + expect( + BufferUtils.areBuffersEqual(root.get(keyData.key), BufferUtils.base64Decode(keyData.data.value)), + `Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`, + ).to.be.true; + } else { + expect(root.get(keyData.key)).to.equal( + keyData.data.value, + `Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`, + ); + } + }); + expect(root.get('foo')).to.equal( + 'bar', + 'Check root has correct value for "foo" key from operation received outside of STATE_SYNC after other buffered operations were applied', + ); + }, + }, + ]; + + for (const scenario of operationsDuringSyncSequence) { + if (scenario.skip === true) { + continue; + } + + /** @nospec */ + it(scenario.description, async function () { + const helper = this.test.helper; + const liveObjectsHelper = new LiveObjectsHelper(helper); + const client = RealtimeWithLiveObjects(helper); + + await helper.monitorConnectionThenCloseAndFinish(async () => { + const channelName = scenario.description; + const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); + const liveObjects = channel.liveObjects; + + await channel.attach(); + // wait for getRoot() to resolve so the initial SYNC sequence is completed, + // as we're going to initiate a new one to test applying operations during SYNC sequence. + const root = await liveObjects.getRoot(); + + await scenario.action({ root, liveObjectsHelper, channelName, channel }); + }, client); + }); + } }); /** @nospec */