Skip to content

Commit

Permalink
Apply timeserial dependant operations only if op timeserial is > than…
Browse files Browse the repository at this point in the history
… object/entry timeserial
  • Loading branch information
VeskeR committed Nov 5, 2024
1 parent ef7b975 commit e763a30
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 25 deletions.
18 changes: 12 additions & 6 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,13 +224,16 @@ export class LiveMap extends LiveObject<LiveMapData> {
const { ErrorInfo, Utils } = this._client;

const existingEntry = this._dataRef.data.get(op.key);
if (existingEntry && opOriginTimeserial.before(existingEntry.timeserial)) {
// the operation's origin timeserial < the entry's timeserial, ignore the operation.
if (
existingEntry &&
(opOriginTimeserial.before(existingEntry.timeserial) || opOriginTimeserial.equal(existingEntry.timeserial))
) {
// the operation's origin timeserial <= the entry's timeserial, ignore the operation.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapSet()',
`skipping updating key="${op.key}" as existing key entry has greater timeserial: ${existingEntry.timeserial.toString()}, than the op: ${opOriginTimeserial.toString()}; objectId=${this._objectId}`,
`skipping update for key="${op.key}": op timeserial ${opOriginTimeserial.toString()} <= entry timeserial ${existingEntry.timeserial.toString()}; objectId=${this._objectId}`,
);
return;
}
Expand Down Expand Up @@ -271,13 +274,16 @@ export class LiveMap extends LiveObject<LiveMapData> {

private _applyMapRemove(op: StateMapOp, opOriginTimeserial: Timeserial): void {
const existingEntry = this._dataRef.data.get(op.key);
if (existingEntry && opOriginTimeserial.before(existingEntry.timeserial)) {
// the operation's origin timeserial < the entry's timeserial, ignore the operation.
if (
existingEntry &&
(opOriginTimeserial.before(existingEntry.timeserial) || opOriginTimeserial.equal(existingEntry.timeserial))
) {
// the operation's origin timeserial <= the entry's timeserial, ignore the operation.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapRemove()',
`skipping removing key="${op.key}" as existing key entry has greater timeserial: ${existingEntry.timeserial.toString()}, than the op: ${opOriginTimeserial.toString()}; objectId=${this._objectId}`,
`skipping remove for key="${op.key}": op timeserial ${opOriginTimeserial.toString()} <= entry timeserial ${existingEntry.timeserial.toString()}; objectId=${this._objectId}`,
);
return;
}
Expand Down
9 changes: 6 additions & 3 deletions src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,13 +151,16 @@ export class LiveObjectsPool {
}

// otherwise we need to compare regional timeserials
if (regionalTimeserial.before(existingObject.getRegionalTimeserial())) {
// the operation's regional timeserial < the object's timeserial, ignore the operation.
if (
regionalTimeserial.before(existingObject.getRegionalTimeserial()) ||
regionalTimeserial.equal(existingObject.getRegionalTimeserial())
) {
// the operation's regional timeserial <= the object's timeserial, ignore the operation.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveObjects.LiveObjectsPool.applyBufferedStateMessages()',
`skipping applying buffered state operation message as existing object has greater regional timeserial: ${existingObject.getRegionalTimeserial().toString()}, than the op: ${regionalTimeserial.toString()}; objectId=${stateMessage.operation.objectId}, message id: ${stateMessage.id}, channel: ${this._channel.name}`,
`skipping buffered state operation message: op regional timeserial ${regionalTimeserial.toString()} <= object regional timeserial ${existingObject.getRegionalTimeserial().toString()}; objectId=${stateMessage.operation.objectId}, message id: ${stateMessage.id}, channel: ${this._channel.name}`,
);
continue;
}
Expand Down
36 changes: 20 additions & 16 deletions test/realtime/live_objects.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -1070,23 +1070,27 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
],
});

// inject operations with older regional timeserial, expect them not to be applied when sync ends
// inject operations with older or equal regional timeserial, expect them not to be applied when sync ends
await Promise.all(
['root', mapId].flatMap((objectId) =>
primitiveKeyData.map((keyData) =>
liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@0-0',
state: [liveObjectsHelper.mapSetOp({ objectId, key: keyData.key, data: keyData.data })],
}),
),
),
['@0-0', '@1-0'].map(async (serial) => {
await Promise.all(
['root', mapId].flatMap((objectId) =>
primitiveKeyData.map((keyData) =>
liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial,
state: [liveObjectsHelper.mapSetOp({ objectId, key: keyData.key, data: keyData.data })],
}),
),
),
);
await liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial,
state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })],
});
}),
);
await liveObjectsHelper.processStateOperationMessageOnChannel({
channel,
serial: '@0-0',
state: [liveObjectsHelper.counterIncOp({ objectId: counterId, amount: 1 })],
});

// inject operations with greater regional timeserial, expect them to be applied when sync ends
await Promise.all(
Expand All @@ -1110,7 +1114,7 @@ define(['ably', 'shared_helper', 'chai', 'live_objects', 'live_objects_helper'],
syncSerial: 'serial:',
});

// check operations with older regional timeserial are not applied
// check operations with older or equal regional timeserial are not applied
// counter will be checked to match an expected value explicitly, so no need to check that it doesn't equal a sum of operations
primitiveKeyData.forEach((keyData) => {
expect(
Expand Down

0 comments on commit e763a30

Please sign in to comment.