From ef7b975530fbb8c6dda69f1442f1e9d3e545e308 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Thu, 24 Oct 2024 23:28:45 +0100 Subject: [PATCH] Add LiveObjects tests for buffering and flushing operations outside of sync sequence --- test/realtime/live_objects.test.js | 327 ++++++++++++++++++++++++++++- 1 file changed, 326 insertions(+), 1 deletion(-) diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index 0402e39da..4d472d1a4 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -236,7 +236,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }); /** @nospec */ - it('getRoot() waits for subsequent STATE_SYNC to finish before resolving', async function () { + it('getRoot() waits for STATE_SYNC with empty cursor before resolving', async function () { const helper = this.test.helper; const liveObjectsHelper = new LiveObjectsHelper(helper); const client = RealtimeWithLiveObjects(helper); @@ -459,6 +459,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], data: { value: 'eyJwcm9kdWN0SWQiOiAiMDAxIiwgInByb2R1Y3ROYW1lIjogImNhciJ9', encoding: 'base64' }, }, { key: 'emptyBytesKey', data: { value: '', encoding: 'base64' } }, + { key: 'maxSafeIntegerKey', data: { value: Number.MAX_SAFE_INTEGER } }, + { key: 'negativeMaxSafeIntegerKey', data: { value: -Number.MAX_SAFE_INTEGER } }, { key: 'numberKey', data: { value: 1 } }, { key: 'zeroKey', data: { value: 0 } }, { key: 'trueKey', data: { value: true } }, @@ -902,6 +904,329 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, client); }); } + + const operationsDuringSyncSequence = [ + { + description: 'state operation messages are buffered during STATE_SYNC sequence', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:cursor', + }); + + // inject operations, it should not be applied as sync is in progress + await Promise.all( + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], + }), + ), + ); + + // check root doesn't have data from operations + primitiveKeyData.forEach((keyData) => { + expect(root.get(keyData.key), `Check "${keyData.key}" key doesn't exist on root during STATE_SYNC`).to.not + .exist; + }); + }, + }, + + { + description: 'buffered state operation messages are applied when STATE_SYNC sequence ends', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:cursor', + }); + + // inject operations, they should be applied when sync ends + await Promise.all( + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], + }), + ), + ); + + // end the sync with empty cursor + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', + }); + + // check everything is applied correctly + primitiveKeyData.forEach((keyData) => { + if (keyData.data.encoding) { + expect( + BufferUtils.areBuffersEqual(root.get(keyData.key), BufferUtils.base64Decode(keyData.data.value)), + `Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`, + ).to.be.true; + } else { + expect(root.get(keyData.key)).to.equal( + keyData.data.value, + `Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`, + ); + } + }); + }, + }, + + { + description: 'buffered state operation messages are discarded when new STATE_SYNC sequence starts', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:cursor', + }); + + // inject operations, expect them to be discarded when sync with new sequence id starts + await Promise.all( + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], + }), + ), + ); + + // start new sync with new sequence id + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'otherserial:cursor', + }); + + // inject another operation that should be applied when latest sync ends + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'foo', data: { value: 'bar' } })], + }); + + // end sync + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'otherserial:', + }); + + // check root doesn't have data from operations received during first sync + primitiveKeyData.forEach((keyData) => { + expect( + root.get(keyData.key), + `Check "${keyData.key}" key doesn't exist on root when STATE_SYNC has ended`, + ).to.not.exist; + }); + + // check root has data from operations received during second sync + expect(root.get('foo')).to.equal( + 'bar', + 'Check root has data from operations received during second STATE_SYNC sequence', + ); + }, + }, + + { + description: 'buffered state operation messages are applied based on regional timeserial of the object', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages + const mapId = liveObjectsHelper.fakeMapObjectId(); + const counterId = liveObjectsHelper.fakeCounterObjectId(); + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:cursor', + // add state object messages with non-zero regional timeserials + state: [ + liveObjectsHelper.mapObject({ + objectId: 'root', + regionalTimeserial: '@1-0', + entries: { + map: { timeserial: '@0-0', data: { objectId: mapId } }, + counter: { timeserial: '@0-0', data: { objectId: counterId } }, + }, + }), + liveObjectsHelper.mapObject({ + objectId: mapId, + regionalTimeserial: '@1-0', + }), + liveObjectsHelper.counterObject({ + objectId: counterId, + regionalTimeserial: '@1-0', + }), + ], + }); + + // inject operations with older regional timeserial, expect them not to be applied when sync ends + await Promise.all( + ['root', mapId].flatMap((objectId) => + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId, key: keyData.key, data: keyData.data })], + }), + ), + ), + ); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })], + }); + + // inject operations with greater regional timeserial, expect them to be applied when sync ends + await Promise.all( + ['root', mapId].map((objectId) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@2-0', + state: [liveObjectsHelper.mapSetOp({ objectId, key: 'foo', data: { value: 'bar' } })], + }), + ), + ); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@2-0', + state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })], + }); + + // end sync + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', + }); + + // check operations with older regional timeserial are not applied + // counter will be checked to match an expected value explicitly, so no need to check that it doesn't equal a sum of operations + primitiveKeyData.forEach((keyData) => { + expect( + root.get(keyData.key), + `Check "${keyData.key}" key doesn't exist on root when STATE_SYNC has ended`, + ).to.not.exist; + }); + primitiveKeyData.forEach((keyData) => { + expect( + root.get('map').get(keyData.key), + `Check "${keyData.key}" key doesn't exist on inner map when STATE_SYNC has ended`, + ).to.not.exist; + }); + + // check operations with greater regional timeserial are applied + expect(root.get('foo')).to.equal( + 'bar', + 'Check only data from operations with greater regional timeserial exists on root after STATE_SYNC', + ); + expect(root.get('map').get('foo')).to.equal( + 'bar', + 'Check only data from operations with greater regional timeserial exists on inner map after STATE_SYNC', + ); + expect(root.get('counter').value()).to.equal( + 1, + 'Check only increment operations with greater regional timeserial were applied to counter after STATE_SYNC', + ); + }, + }, + + { + description: + 'subsequent state operation messages are applied immediately after STATE_SYNC ended and buffers are applied', + action: async (ctx) => { + const { root, liveObjectsHelper, channel, channelName } = ctx; + + // start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:cursor', + }); + + // inject operations, they should be applied when sync ends + await Promise.all( + primitiveKeyData.map((keyData) => + liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: '@0-0', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })], + }), + ), + ); + + // end the sync with empty cursor + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', + }); + + // send some more operations + await liveObjectsHelper.stateRequest( + channelName, + liveObjectsHelper.mapSetOp({ + objectId: 'root', + key: 'foo', + data: { value: 'bar' }, + }), + ); + + // check buffered operations are applied, as well as the most recent operation outside of the STATE_SYNC is applied + primitiveKeyData.forEach((keyData) => { + if (keyData.data.encoding) { + expect( + BufferUtils.areBuffersEqual(root.get(keyData.key), BufferUtils.base64Decode(keyData.data.value)), + `Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`, + ).to.be.true; + } else { + expect(root.get(keyData.key)).to.equal( + keyData.data.value, + `Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`, + ); + } + }); + expect(root.get('foo')).to.equal( + 'bar', + 'Check root has correct value for "foo" key from operation received outside of STATE_SYNC after other buffered operations were applied', + ); + }, + }, + ]; + + for (const scenario of operationsDuringSyncSequence) { + if (scenario.skip === true) { + continue; + } + + /** @nospec */ + it(scenario.description, async function () { + const helper = this.test.helper; + const liveObjectsHelper = new LiveObjectsHelper(helper); + const client = RealtimeWithLiveObjects(helper); + + await helper.monitorConnectionThenCloseAndFinish(async () => { + const channelName = scenario.description; + const channel = client.channels.get(channelName, channelOptionsWithLiveObjects()); + const liveObjects = channel.liveObjects; + + await channel.attach(); + // wait for getRoot() to resolve so the initial SYNC sequence is completed, + // as we're going to initiate a new one to test applying operations during SYNC sequence. + const root = await liveObjects.getRoot(); + + await scenario.action({ root, liveObjectsHelper, channelName, channel }); + }, client); + }); + } }); /** @nospec */