Skip to content

Commit

Permalink
Add LiveObjects tests for buffering and flushing operations outside o…
Browse files Browse the repository at this point in the history
…f sync sequence
  • Loading branch information
VeskeR committed Oct 25, 2024
1 parent c4f4d36 commit 0a3ff83
Showing 1 changed file with 326 additions and 1 deletion.
327 changes: 326 additions & 1 deletion test/realtime/live_objects.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
});

/** @nospec */
it('getRoot() waits for subsequent STATE_SYNC to finish before resolving', async function () {
it('getRoot() waits for STATE_SYNC with empty cursor before resolving', async function () {
const helper = this.test.helper;
const liveObjectsHelper = new LiveObjectsHelper(helper);
const client = RealtimeWithLiveObjects(helper);
Expand Down Expand Up @@ -459,6 +459,8 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
data: { value: 'eyJwcm9kdWN0SWQiOiAiMDAxIiwgInByb2R1Y3ROYW1lIjogImNhciJ9', encoding: 'base64' },
},
{ key: 'emptyBytesKey', data: { value: '', encoding: 'base64' } },
{ key: 'maxSafeIntegerKey', data: { value: Number.MAX_SAFE_INTEGER } },
{ key: 'negativeMaxSafeIntegerKey', data: { value: -Number.MAX_SAFE_INTEGER } },
{ key: 'numberKey', data: { value: 1 } },
{ key: 'zeroKey', data: { value: 0 } },
{ key: 'trueKey', data: { value: true } },
Expand Down Expand Up @@ -903,6 +905,329 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
}, client);
});
}

const operationsDuringSyncSequence = [
{
description: 'state operation messages are buffered during STATE_SYNC sequence',
action: async (ctx) => {
const { root, liveObjectsHelper, channel } = ctx;

// start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
syncSerial: 'serial:cursor',
});

// inject operations, it should not be applied as sync is in progress
await Promise.all(
primitiveKeyData.map((keyData) =>
liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@0-0',
state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })],
}),
),
);

// check root doesn't have data from operations
primitiveKeyData.forEach((keyData) => {
expect(root.get(keyData.key), `Check "${keyData.key}" key doesn't exist on root during STATE_SYNC`).to.not
.exist;
});
},
},

{
description: 'buffered state operation messages are applied when STATE_SYNC sequence ends',
action: async (ctx) => {
const { root, liveObjectsHelper, channel } = ctx;

// start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
syncSerial: 'serial:cursor',
});

// inject operations, they should be applied when sync ends
await Promise.all(
primitiveKeyData.map((keyData) =>
liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@0-0',
state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })],
}),
),
);

// end the sync with empty cursor
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
syncSerial: 'serial:',
});

// check everything is applied correctly
primitiveKeyData.forEach((keyData) => {
if (keyData.data.encoding) {
expect(
BufferUtils.areBuffersEqual(root.get(keyData.key), BufferUtils.base64Decode(keyData.data.value)),
`Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`,
).to.be.true;
} else {
expect(root.get(keyData.key)).to.equal(
keyData.data.value,
`Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`,
);
}
});
},
},

{
description: 'buffered state operation messages are discarded when new STATE_SYNC sequence starts',
action: async (ctx) => {
const { root, liveObjectsHelper, channel } = ctx;

// start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
syncSerial: 'serial:cursor',
});

// inject operations, expect them to be discarded when sync with new sequence id starts
await Promise.all(
primitiveKeyData.map((keyData) =>
liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@0-0',
state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })],
}),
),
);

// start new sync with new sequence id
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
syncSerial: 'otherserial:cursor',
});

// inject another operation that should be applied when latest sync ends
await liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@0-0',
state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'foo', data: { value: 'bar' } })],
});

// end sync
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
syncSerial: 'otherserial:',
});

// check root doesn't have data from operations received during first sync
primitiveKeyData.forEach((keyData) => {
expect(
root.get(keyData.key),
`Check "${keyData.key}" key doesn't exist on root when STATE_SYNC has ended`,
).to.not.exist;
});

// check root has data from operations received during second sync
expect(root.get('foo')).to.equal(
'bar',
'Check root has data from operations received during second STATE_SYNC sequence',
);
},
},

{
description: 'buffered state operation messages are applied based on regional timeserial of the object',
action: async (ctx) => {
const { root, liveObjectsHelper, channel } = ctx;

// start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages
const mapId = liveObjectsHelper.fakeMapObjectId();
const counterId = liveObjectsHelper.fakeCounterObjectId();
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
syncSerial: 'serial:cursor',
// add state object messages with non-zero regional timeserials
state: [
liveObjectsHelper.mapObject({
objectId: 'root',
regionalTimeserial: '@1-0',
entries: {
map: { timeserial: '@0-0', data: { objectId: mapId } },
counter: { timeserial: '@0-0', data: { objectId: counterId } },
},
}),
liveObjectsHelper.mapObject({
objectId: mapId,
regionalTimeserial: '@1-0',
}),
liveObjectsHelper.counterObject({
objectId: counterId,
regionalTimeserial: '@1-0',
}),
],
});

// inject operations with older regional timeserial, expect them not to be applied when sync ends
await Promise.all(
['root', mapId].flatMap((objectId) =>
primitiveKeyData.map((keyData) =>
liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@0-0',
state: [liveObjectsHelper.mapSetOp({ objectId, key: keyData.key, data: keyData.data })],
}),
),
),
);
await liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@0-0',
state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })],
});

// inject operations with greater regional timeserial, expect them to be applied when sync ends
await Promise.all(
['root', mapId].map((objectId) =>
liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@2-0',
state: [liveObjectsHelper.mapSetOp({ objectId, key: 'foo', data: { value: 'bar' } })],
}),
),
);
await liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@2-0',
state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })],
});

// end sync
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
syncSerial: 'serial:',
});

// check operations with older regional timeserial are not applied
// counter will be checked to match an expected value explicitly, so no need to check that it doesn't equal a sum of operations
primitiveKeyData.forEach((keyData) => {
expect(
root.get(keyData.key),
`Check "${keyData.key}" key doesn't exist on root when STATE_SYNC has ended`,
).to.not.exist;
});
primitiveKeyData.forEach((keyData) => {
expect(
root.get('map').get(keyData.key),
`Check "${keyData.key}" key doesn't exist on inner map when STATE_SYNC has ended`,
).to.not.exist;
});

// check operations with greater regional timeserial are applied
expect(root.get('foo')).to.equal(
'bar',
'Check only data from operations with greater regional timeserial exists on root after STATE_SYNC',
);
expect(root.get('map').get('foo')).to.equal(
'bar',
'Check only data from operations with greater regional timeserial exists on inner map after STATE_SYNC',
);
expect(root.get('counter').value()).to.equal(
1,
'Check only increment operations with greater regional timeserial were applied to counter after STATE_SYNC',
);
},
},

{
description:
'subsequent state operation messages are applied immediately after STATE_SYNC ended and buffers are applied',
action: async (ctx) => {
const { root, liveObjectsHelper, channel, channelName } = ctx;

// start new sync sequence with a cursor so client will wait for the next STATE_SYNC messages
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
syncSerial: 'serial:cursor',
});

// inject operations, they should be applied when sync ends
await Promise.all(
primitiveKeyData.map((keyData) =>
liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@0-0',
state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: keyData.key, data: keyData.data })],
}),
),
);

// end the sync with empty cursor
await liveObjectsHelper.processStateObjectMessageOnChannel({
channel,
syncSerial: 'serial:',
});

// send some more operations
await liveObjectsHelper.stateRequest(
channelName,
liveObjectsHelper.mapSetOp({
objectId: 'root',
key: 'foo',
data: { value: 'bar' },
}),
);

// check buffered operations are applied, as well as the most recent operation outside of the STATE_SYNC is applied
primitiveKeyData.forEach((keyData) => {
if (keyData.data.encoding) {
expect(
BufferUtils.areBuffersEqual(root.get(keyData.key), BufferUtils.base64Decode(keyData.data.value)),
`Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`,
).to.be.true;
} else {
expect(root.get(keyData.key)).to.equal(
keyData.data.value,
`Check root has correct value for "${keyData.key}" key after STATE_SYNC has ended and buffered operations are applied`,
);
}
});
expect(root.get('foo')).to.equal(
'bar',
'Check root has correct value for "foo" key from operation received outside of STATE_SYNC after other buffered operations were applied',
);
},
},
];

for (const scenario of operationsDuringSyncSequence) {
if (scenario.skip === true) {
continue;
}

/** @nospec */
it(scenario.description, async function () {
const helper = this.test.helper;
const liveObjectsHelper = new LiveObjectsHelper(helper);
const client = RealtimeWithLiveObjects(helper);

await helper.monitorConnectionThenCloseAndFinish(async () => {
const channelName = scenario.description;
const channel = client.channels.get(channelName, channelOptionsWithLiveObjects());
const liveObjects = channel.liveObjects;

await channel.attach();
// wait for getRoot() to resolve so the initial SYNC sequence is completed,
// as we're going to initiate a new one to test applying operations during SYNC sequence.
const root = await liveObjects.getRoot();

await scenario.action({ root, liveObjectsHelper, channelName, channel });
}, client);
});
}
});

/** @nospec */
Expand Down

0 comments on commit 0a3ff83

Please sign in to comment.