From 92c8f111cddd369bbe69e1b570454d3df56f4d57 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Thu, 5 Dec 2024 04:26:15 +0000 Subject: [PATCH] Surface live objects to the end users only when they become valid Valid live objects are those for which the realtime client has seen the corresponding CREATE operation. Resolves DTP-1104 --- src/plugins/liveobjects/livecounter.ts | 4 +- src/plugins/liveobjects/livemap.ts | 88 +++++- src/plugins/liveobjects/liveobject.ts | 60 ++++ src/plugins/liveobjects/liveobjects.ts | 2 + src/plugins/liveobjects/liveobjectspool.ts | 4 + src/plugins/liveobjects/statemessage.ts | 7 + test/realtime/live_objects.test.js | 304 +++++++++++++++++++-- 7 files changed, 424 insertions(+), 45 deletions(-) diff --git a/src/plugins/liveobjects/livecounter.ts b/src/plugins/liveobjects/livecounter.ts index dc144c325..9cf085365 100644 --- a/src/plugins/liveobjects/livecounter.ts +++ b/src/plugins/liveobjects/livecounter.ts @@ -123,7 +123,7 @@ export class LiveCounter extends LiveObject const previousDataRef = this._dataRef; // override all relevant data for this object with data from the state object - this._createOperationIsMerged = false; + this._setCreateOperationIsMerged(false); this._dataRef = { data: stateObject.counter?.count ?? 0 }; // should default to empty map if site timeserials do not exist on the state object, so that any future operation can be applied to this object this._siteTimeserials = stateObject.siteTimeserials ?? {}; @@ -149,7 +149,7 @@ export class LiveCounter extends LiveObject // if we got here, it means that current counter instance is missing the initial value in its data reference, // which we're going to add now. this._dataRef.data += stateOperation.counter?.count ?? 0; - this._createOperationIsMerged = true; + this._setCreateOperationIsMerged(true); return { update: { inc: stateOperation.counter?.count ?? 0 } }; } diff --git a/src/plugins/liveobjects/livemap.ts b/src/plugins/liveobjects/livemap.ts index 1f3729c5f..e4b1200ef 100644 --- a/src/plugins/liveobjects/livemap.ts +++ b/src/plugins/liveobjects/livemap.ts @@ -1,7 +1,7 @@ import deepEqual from 'deep-equal'; import type * as API from '../../../ably'; -import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; +import { BufferedOperation, LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject'; import { LiveObjects } from './liveobjects'; import { MapSemantics, @@ -77,10 +77,12 @@ export class LiveMap extends LiveObject(key: TKey): T[TKey] { @@ -94,14 +96,26 @@ export class LiveMap extends LiveObject extends LiveObject extends LiveObject extends LiveObject extends LiveObject extends LiveObject { + try { + const update = this._applyMapSet(op, opOriginTimeserial); + this.notifyUpdated(update); + } catch (error) { + this._client.Logger.logAction( + this._client.logger, + this._client.Logger.LOG_ERROR, + `LiveMap._handleMapSetWithInvalidObjectReference()`, + `error applying buffered MAP_SET operation: ${this._client.Utils.inspectError(error)}`, + ); + } finally { + this._bufferedOperations.delete(bufferedOperation); + } + }); + + const bufferedOperation: BufferedOperation = { + cancel: () => off(), + }; + this._bufferedOperations.add(bufferedOperation); + } + private _applyMapCreate(op: StateOperation): LiveMapUpdate | LiveObjectUpdateNoop { if (this._createOperationIsMerged) { // There can't be two different create operation for the same object id, because the object id @@ -395,11 +458,6 @@ export class LiveMap extends LiveObject; protected _createOperationIsMerged: boolean; + protected _bufferedOperations: Set; protected constructor( protected _liveObjects: LiveObjects, @@ -51,6 +64,7 @@ export abstract class LiveObject< this._objectId = objectId; // use empty timeserials vector by default, so any future operation can be applied to this object this._siteTimeserials = {}; + this._bufferedOperations = new Set(); } subscribe(listener: (update: TUpdate) => void): SubscribeResponse { @@ -99,6 +113,42 @@ export abstract class LiveObject< this._eventEmitter.emit(LiveObjectEvents.Updated, update); } + /** + * Object is considered a "valid object" if we have seen the create operation for that object. + * + * Non-valid objects should be treated as though they don't exist from the perspective of the public API for the end users, + * i.e. the public access API that would return this object instead should return an `undefined`. In other words, non-valid + * objects are not surfaced to the end users and they're not able to interact with it. + * + * Once the create operation for the object has been seen and merged, the object becomes valid and can be exposed to the end users. + * + * @internal + */ + isValid(): boolean { + return this._createOperationIsMerged; + } + + /** + * @internal + */ + onceValid(listener: () => void): OnEventResponse { + this._eventEmitter.once(LiveObjectEvents.Valid, listener); + + const off = () => { + this._eventEmitter.off(LiveObjectEvents.Valid, listener); + }; + + return { off }; + } + + /** + * @internal + */ + cancelBufferedOperations(): void { + this._bufferedOperations.forEach((x) => x.cancel()); + this._bufferedOperations.clear(); + } + /** * Returns true if the given origin timeserial indicates that the operation to which it belongs should be applied to the object. * @@ -118,6 +168,16 @@ export abstract class LiveObject< return !siteTimeserial || opOriginTimeserial > siteTimeserial; } + protected _setCreateOperationIsMerged(createOperationIsMerged: boolean): void { + const shouldNotifyValid = + createOperationIsMerged === true && this._createOperationIsMerged !== createOperationIsMerged; + this._createOperationIsMerged = createOperationIsMerged; + + if (shouldNotifyValid) { + this._eventEmitter.emit(LiveObjectEvents.Valid); + } + } + private _createObjectId(): string { // TODO: implement object id generation based on live object type and initial value return Math.random().toString().substring(2); diff --git a/src/plugins/liveobjects/liveobjects.ts b/src/plugins/liveobjects/liveobjects.ts index aa064de78..f3b07e395 100644 --- a/src/plugins/liveobjects/liveobjects.ts +++ b/src/plugins/liveobjects/liveobjects.ts @@ -149,6 +149,8 @@ export class LiveObjects { private _startNewSync(syncId?: string, syncCursor?: string): void { // need to discard all buffered state operation messages on new sync start this._bufferedStateOperations = []; + // cancel any buffered operations for all objects in the pool, as we're overriding the current state and they will no longer be valid + this._liveObjectsPool.cancelBufferedOperations(); this._syncLiveObjectsDataPool.reset(); this._currentSyncId = syncId; this._currentSyncCursor = syncCursor; diff --git a/src/plugins/liveobjects/liveobjectspool.ts b/src/plugins/liveobjects/liveobjectspool.ts index eb42d47b4..0b1fec6cf 100644 --- a/src/plugins/liveobjects/liveobjectspool.ts +++ b/src/plugins/liveobjects/liveobjectspool.ts @@ -62,6 +62,10 @@ export class LiveObjectsPool { this.set(objectId, zeroValueObject); } + cancelBufferedOperations(): void { + this._pool.forEach((x) => x.cancelBufferedOperations()); + } + private _getInitialPool(): Map { const pool = new Map(); const root = LiveMap.zeroValue(this._liveObjects, ROOT_OBJECT_ID); diff --git a/src/plugins/liveobjects/statemessage.ts b/src/plugins/liveobjects/statemessage.ts index 74b385630..f6be7fa4b 100644 --- a/src/plugins/liveobjects/statemessage.ts +++ b/src/plugins/liveobjects/statemessage.ts @@ -318,6 +318,13 @@ export class StateMessage { }; } + /** + * Returns true if this state message is a state operation with `MAP_SET` action and it sets a map entry to point to another objectId. + */ + isMapSetWithObjectIdReference(): boolean { + return this.operation?.action === StateOperationAction.MAP_SET && this.operation.mapOp?.data?.objectId != null; + } + /** * Overload toJSON() to intercept JSON.stringify() * @return {*} diff --git a/test/realtime/live_objects.test.js b/test/realtime/live_objects.test.js index 01eb376fc..6a91bb0ad 100644 --- a/test/realtime/live_objects.test.js +++ b/test/realtime/live_objects.test.js @@ -704,28 +704,33 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], } // check only operations with correct timeserials were applied - const expectedMapValues = [ - { foo: 'bar' }, - { foo: 'bar' }, - { foo: 'bar', baz: 'qux' }, // applied MAP_CREATE - { foo: 'bar', baz: 'qux' }, // applied MAP_CREATE - { foo: 'bar', baz: 'qux' }, // applied MAP_CREATE + const expectedMaps = [ + { exists: false }, // MAP_CREATE not applied, object is not valid and we should get undefined + { exists: false }, // MAP_CREATE not applied, object is not valid and we should get undefined + { exists: true, data: { foo: 'bar', baz: 'qux' } }, // applied MAP_CREATE, object is valid + { exists: true, data: { foo: 'bar', baz: 'qux' } }, // applied MAP_CREATE, object is valid + { exists: true, data: { foo: 'bar', baz: 'qux' } }, // applied MAP_CREATE, object is valid ]; for (const [i, mapId] of mapIds.entries()) { - const expectedMapValue = expectedMapValues[i]; - const expectedKeysCount = Object.keys(expectedMapValue).length; + const expectedMap = expectedMaps[i]; + if (!expectedMap.exists) { + expect(root.get(mapId), `Check map #${i + 1} does not exist on root as MAP_CREATE op was not applied`) + .to.not.exist; + } else { + const expectedKeysCount = Object.keys(expectedMap.data).length; - expect(root.get(mapId).size()).to.equal( - expectedKeysCount, - `Check map #${i + 1} has expected number of keys after MAP_CREATE ops`, - ); - Object.entries(expectedMapValue).forEach(([key, value]) => { - expect(root.get(mapId).get(key)).to.equal( - value, - `Check map #${i + 1} has expected value for "${key}" key after MAP_CREATE ops`, + expect(root.get(mapId).size()).to.equal( + expectedKeysCount, + `Check map #${i + 1} has expected number of keys after MAP_CREATE ops`, ); - }); + Object.entries(expectedMap.data).forEach(([key, value]) => { + expect(root.get(mapId).get(key)).to.equal( + value, + `Check map #${i + 1} has expected value for "${key}" key after MAP_CREATE ops`, + ); + }); + } } }, }, @@ -895,6 +900,241 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], }, }, + { + description: 'MAP_SET with reference to an invalid object is buffered until object becomes valid', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + const mapId = liveObjectsHelper.fakeMapObjectId(); + const counterId = liveObjectsHelper.fakeCounterObjectId(); + // MAP_SET on root to non-valid object + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'map', data: { objectId: mapId } })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 1, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], + }); + + expect(root.get('map'), 'Check map does not exist on root until map is valid').to.not.exist; + expect(root.get('counter'), 'Check counter does not exist on root until counter is valid').to.not.exist; + + // send CREATE ops which should make objects valid and add them to the root + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 2, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapCreateOp({ objectId: mapId })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 3, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + + expect( + root.get('map'), + 'Check map exists on root after MAP_CREATE was seen and buffered MAP_SET op was applied', + ).to.exist; + expect( + root.get('counter'), + 'Check counter exists on root after COUNTER_CREATE was seen and buffered MAP_SET op was applied', + ).to.exist; + }, + }, + + { + description: 'MAP_SET with reference to an invalid object triggers subscription callback only when applied', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + // subscribe to updates on root. should only proc once CREATE ops are received for referenced objects + let subscribeCallbackCalledCount = 0; + const keyUpdated = { + map: false, + counter: false, + }; + root.subscribe(({ update }) => { + subscribeCallbackCalledCount++; + Object.keys(update).forEach((x) => (keyUpdated[x] = true)); + }); + + const mapId = liveObjectsHelper.fakeMapObjectId(); + const counterId = liveObjectsHelper.fakeCounterObjectId(); + // MAP_SET on root to non-valid object + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'map', data: { objectId: mapId } })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 1, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], + }); + + expect(subscribeCallbackCalledCount).to.equal( + 0, + `Check subscription callback on root wasn't called for MAP_SET operations with invalid objects`, + ); + expect(keyUpdated.map).to.equal( + false, + 'Check "map" key was not updated via a subscription callback on root', + ); + expect(keyUpdated.counter).to.equal( + false, + 'Check "counter" key was not updated via a subscription callback on root', + ); + + // send CREATE ops which should make objects valid and add them to the root + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 2, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapCreateOp({ objectId: mapId })], + }); + + expect(subscribeCallbackCalledCount).to.equal( + 1, + `Check subscription callback for root is called correct number of times once MAP_SET ops are applied for valid objects`, + ); + expect(keyUpdated.map).to.equal(true, 'Check "map" key was updated via a subscription callback on root'); + + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 3, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + + expect(subscribeCallbackCalledCount).to.equal( + 2, + `Check subscription callback for root is called correct number of times once MAP_SET ops are applied for valid objects`, + ); + expect(keyUpdated.counter).to.equal( + true, + 'Check "counter" key was updated via a subscription callback on root', + ); + }, + }, + + { + description: + 'MAP_SET with reference to an invalid object is applied once object becomes valid even if site timeserials have updated', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + const counterId = liveObjectsHelper.fakeCounterObjectId(); + // MAP_SET on root to non-valid object + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], + }); + + // create another object and set it on root with higher timeserial than buffered MAP_SET + const anotherCounterId = liveObjectsHelper.fakeCounterObjectId(); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('bbb', 0, 0), + siteCode: 'bbb', + state: [liveObjectsHelper.counterCreateOp({ objectId: anotherCounterId })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 99, 0), // higher timeserial than buffered MAP_SET above + siteCode: 'aaa', + state: [ + liveObjectsHelper.mapSetOp({ + objectId: 'root', + key: 'anotherCounter', + data: { objectId: anotherCounterId }, + }), + ], + }); + + expect( + root.get('anotherCounter'), + 'Check another object was set on root while another MAP_SET operation is buffered', + ).to.exist; + expect(root.get('counter'), 'Check counter does not exist on root until counter is valid').to.not.exist; + + // send CREATE op which should make objects valid and add them to the root + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('bbb', 1, 0), + siteCode: 'bbb', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + + expect( + root.get('counter'), + 'Check counter exists on root after COUNTER_CREATE was seen and buffered MAP_SET op was applied', + ).to.exist; + }, + }, + + { + description: + 'buffered MAP_SET with reference to an invalid object is discarded when new STATE_SYNC sequence starts', + action: async (ctx) => { + const { root, liveObjectsHelper, channel } = ctx; + + const counterId = liveObjectsHelper.fakeCounterObjectId(); + // MAP_SET on root to non-valid object + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 0, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.mapSetOp({ objectId: 'root', key: 'counter', data: { objectId: counterId } })], + }); + + expect(root.get('counter'), 'Check counter does not exist on root as counter is not valid').to.not.exist; + + // inject STATE_SYNC message with empty serial so it is ended immediately + await liveObjectsHelper.processStateObjectMessageOnChannel({ + channel, + syncSerial: 'serial:', + }); + + // send COUNTER_CREATE op and set it on another key on root. only this new MAP_SET op should be applied + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 1, 0), + siteCode: 'aaa', + state: [liveObjectsHelper.counterCreateOp({ objectId: counterId })], + }); + await liveObjectsHelper.processStateOperationMessageOnChannel({ + channel, + serial: lexicoTimeserial('aaa', 2, 0), + siteCode: 'aaa', + state: [ + liveObjectsHelper.mapSetOp({ + objectId: 'root', + key: 'anotherCounterKey', + data: { objectId: counterId }, + }), + ], + }); + + expect( + root.get('counter'), + 'Check MAP_SET for "counter" key was discarded on new STATE_SYNC sequence and not applied on root even when counter became valid', + ).to.not.exist; + expect(root.get('anotherCounterKey'), 'Check valid counter was set on "anotherCounterKey" key on root').to + .exist; + }, + }, + { description: 'can apply MAP_REMOVE state operation messages', action: async (ctx) => { @@ -1126,21 +1366,28 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], } // check only operations with correct timeserials were applied - const expectedCounterValues = [ - 1, - 1, - 11, // applied COUNTER_CREATE - 11, // applied COUNTER_CREATE - 11, // applied COUNTER_CREATE + const expectedCounters = [ + { exists: false }, // COUNTER_CREATE not applied, object is not valid and we should get undefined + { exists: false }, // COUNTER_CREATE not applied, object is not valid and we should get undefined + { exists: true, value: 11 }, // applied COUNTER_CREATE + { exists: true, value: 11 }, // applied COUNTER_CREATE + { exists: true, value: 11 }, // applied COUNTER_CREATE ]; for (const [i, counterId] of counterIds.entries()) { - const expectedValue = expectedCounterValues[i]; + const expectedCounter = expectedCounters[i]; - expect(root.get(counterId).value()).to.equal( - expectedValue, - `Check counter #${i + 1} has expected value after COUNTER_CREATE ops`, - ); + if (!expectedCounter.exists) { + expect( + root.get(counterId), + `Check counter #${i + 1} does not exist on root as COUNTER_CREATE op was not applied`, + ).to.not.exist; + } else { + expect(root.get(counterId).value()).to.equal( + expectedCounter.value, + `Check counter #${i + 1} has expected value after COUNTER_CREATE ops`, + ); + } } }, }, @@ -1426,6 +1673,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'], bbb: lexicoTimeserial('bbb', 2, 0), ccc: lexicoTimeserial('ccc', 5, 0), }, + initialEntries: {}, materialisedEntries: { foo1: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } }, foo2: { timeserial: lexicoTimeserial('bbb', 0, 0), data: { value: 'bar' } },