Skip to content

Commit

Permalink
Merge pull request #1909 from ably/DTP-955/buffer-ops-during-sync
Browse files Browse the repository at this point in the history
[DTP-955] Buffer and flush state operations during a STATE_SYNC sequence
  • Loading branch information
VeskeR authored Nov 14, 2024
2 parents 385e1c8 + d919f8b commit 82f1271
Show file tree
Hide file tree
Showing 8 changed files with 577 additions and 118 deletions.
2 changes: 1 addition & 1 deletion src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ class RealtimeChannel extends EventEmitter {
}
}

this._liveObjects.handleStateMessages(stateMessages);
this._liveObjects.handleStateMessages(stateMessages, message.channelSerial);

break;
}
Expand Down
19 changes: 14 additions & 5 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { LiveObject, LiveObjectData } from './liveobject';
import { LiveObjects } from './liveobjects';
import { StateCounter, StateCounterOp, StateOperation, StateOperationAction } from './statemessage';
import { StateCounter, StateCounterOp, StateMessage, StateOperation, StateOperationAction } from './statemessage';
import { Timeserial } from './timeserial';

export interface LiveCounterData extends LiveObjectData {
data: number;
Expand All @@ -12,17 +13,23 @@ export class LiveCounter extends LiveObject<LiveCounterData> {
private _created: boolean,
initialData?: LiveCounterData | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
super(liveObjects, initialData, objectId);
super(liveObjects, initialData, objectId, regionalTimeserial);
}

/**
* Returns a {@link LiveCounter} instance with a 0 value.
*
* @internal
*/
static zeroValue(liveobjects: LiveObjects, isCreated: boolean, objectId?: string): LiveCounter {
return new LiveCounter(liveobjects, isCreated, null, objectId);
static zeroValue(
liveobjects: LiveObjects,
isCreated: boolean,
objectId?: string,
regionalTimeserial?: Timeserial,
): LiveCounter {
return new LiveCounter(liveobjects, isCreated, null, objectId, regionalTimeserial);
}

value(): number {
Expand All @@ -46,7 +53,7 @@ export class LiveCounter extends LiveObject<LiveCounterData> {
/**
* @internal
*/
applyOperation(op: StateOperation): void {
applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply state operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -75,6 +82,8 @@ export class LiveCounter extends LiveObject<LiveCounterData> {
500,
);
}

this.setRegionalTimeserial(opRegionalTimeserial);
}

protected _getZeroValueData(): LiveCounterData {
Expand Down
11 changes: 7 additions & 4 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ export class LiveMap extends LiveObject<LiveMapData> {
private _semantics: MapSemantics,
initialData?: LiveMapData | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
super(liveObjects, initialData, objectId);
super(liveObjects, initialData, objectId, regionalTimeserial);
}

/**
* Returns a {@link LiveMap} instance with an empty map data.
*
* @internal
*/
static zeroValue(liveobjects: LiveObjects, objectId?: string): LiveMap {
return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId);
static zeroValue(liveobjects: LiveObjects, objectId?: string, regionalTimeserial?: Timeserial): LiveMap {
return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId, regionalTimeserial);
}

static liveMapDataFromMapEntries(client: BaseClient, entries: Record<string, StateMapEntry>): LiveMapData {
Expand Down Expand Up @@ -134,7 +135,7 @@ export class LiveMap extends LiveObject<LiveMapData> {
/**
* @internal
*/
applyOperation(op: StateOperation, msg: StateMessage): void {
applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply state operation with objectId=${op.objectId}, to this LiveMap with objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -171,6 +172,8 @@ export class LiveMap extends LiveObject<LiveMapData> {
500,
);
}

this.setRegionalTimeserial(opRegionalTimeserial);
}

protected _getZeroValueData(): LiveMapData {
Expand Down
12 changes: 8 additions & 4 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type BaseClient from 'common/lib/client/baseclient';
import { LiveObjects } from './liveobjects';
import { StateMessage, StateOperation } from './statemessage';
import { DefaultTimeserial, Timeserial } from './timeserial';

export interface LiveObjectData {
data: any;
Expand All @@ -10,16 +11,19 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
protected _client: BaseClient;
protected _dataRef: T;
protected _objectId: string;
protected _regionalTimeserial?: string;
protected _regionalTimeserial: Timeserial;

constructor(
protected _liveObjects: LiveObjects,
initialData?: T | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
this._client = this._liveObjects.getClient();
this._dataRef = initialData ?? this._getZeroValueData();
this._objectId = objectId ?? this._createObjectId();
// use zero value timeserial by default, so any future operation can be applied for this object
this._regionalTimeserial = regionalTimeserial ?? DefaultTimeserial.zeroValueTimeserial(this._client);
}

/**
Expand All @@ -32,7 +36,7 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
/**
* @internal
*/
getRegionalTimeserial(): string | undefined {
getRegionalTimeserial(): Timeserial {
return this._regionalTimeserial;
}

Expand All @@ -46,7 +50,7 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
/**
* @internal
*/
setRegionalTimeserial(regionalTimeserial: string): void {
setRegionalTimeserial(regionalTimeserial: Timeserial): void {
this._regionalTimeserial = regionalTimeserial;
}

Expand All @@ -58,6 +62,6 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
/**
* @internal
*/
abstract applyOperation(op: StateOperation, msg: StateMessage): void;
abstract applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void;
protected abstract _getZeroValueData(): T;
}
43 changes: 33 additions & 10 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,17 @@ import { LiveObject } from './liveobject';
import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage } from './statemessage';
import { LiveCounterDataEntry, SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';
import { DefaultTimeserial, Timeserial } from './timeserial';

enum LiveObjectsEvents {
SyncCompleted = 'SyncCompleted',
}

export interface BufferedStateMessage {
stateMessage: StateMessage;
regionalTimeserial: Timeserial;
}

export class LiveObjects {
private _client: BaseClient;
private _channel: RealtimeChannel;
Expand All @@ -24,6 +30,7 @@ export class LiveObjects {
private _syncInProgress: boolean;
private _currentSyncId: string | undefined;
private _currentSyncCursor: string | undefined;
private _bufferedStateOperations: BufferedStateMessage[];

constructor(channel: RealtimeChannel) {
this._channel = channel;
Expand All @@ -32,6 +39,7 @@ export class LiveObjects {
this._liveObjectsPool = new LiveObjectsPool(this);
this._syncLiveObjectsDataPool = new SyncLiveObjectsDataPool(this);
this._syncInProgress = true;
this._bufferedStateOperations = [];
}

async getRoot(): Promise<LiveMap> {
Expand Down Expand Up @@ -84,12 +92,23 @@ export class LiveObjects {
/**
* @internal
*/
handleStateMessages(stateMessages: StateMessage[]): void {
handleStateMessages(stateMessages: StateMessage[], msgRegionalTimeserial: string | null | undefined): void {
const timeserial = DefaultTimeserial.calculateTimeserial(this._client, msgRegionalTimeserial);

if (this._syncInProgress) {
// TODO: handle buffering of state messages during SYNC
// The client receives state messages in realtime over the channel concurrently with the SYNC sequence.
// Some of the incoming state messages may have already been applied to the state objects described in
// the SYNC sequence, but others may not; therefore we must buffer these messages so that we can apply
// them to the state objects once the SYNC is complete. To avoid double-counting, the buffered operations
// are applied according to the state object's regional timeserial, which reflects the regional timeserial
// of the state message that was last applied to that state object.
stateMessages.forEach((x) =>
this._bufferedStateOperations.push({ stateMessage: x, regionalTimeserial: timeserial }),
);
return;
}

this._liveObjectsPool.applyStateMessages(stateMessages);
this._liveObjectsPool.applyStateMessages(stateMessages, timeserial);
}

/**
Expand All @@ -100,7 +119,7 @@ export class LiveObjects {
this._client.logger,
this._client.Logger.LOG_MINOR,
'LiveObjects.onAttached()',
'channel = ' + this._channel.name + ', hasState = ' + hasState,
`channel=${this._channel.name}, hasState=${hasState}`,
);

if (hasState) {
Expand Down Expand Up @@ -135,16 +154,20 @@ export class LiveObjects {
}

private _startNewSync(syncId?: string, syncCursor?: string): void {
// need to discard all buffered state operation messages on new sync start
this._bufferedStateOperations = [];
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = syncId;
this._currentSyncCursor = syncCursor;
this._syncInProgress = true;
}

private _endSync(): void {
// TODO: handle applying buffered state messages when SYNC is finished

this._applySync();
// should apply buffered state operations after we applied the SYNC data
this._liveObjectsPool.applyBufferedStateMessages(this._bufferedStateOperations);

this._bufferedStateOperations = [];
this._syncLiveObjectsDataPool.reset();
this._currentSyncId = undefined;
this._currentSyncCursor = undefined;
Expand Down Expand Up @@ -180,10 +203,11 @@ export class LiveObjects {
for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) {
receivedObjectIds.add(objectId);
const existingObject = this._liveObjectsPool.get(objectId);
const regionalTimeserialObj = DefaultTimeserial.calculateTimeserial(this._client, entry.regionalTimeserial);

if (existingObject) {
existingObject.setData(entry.objectData);
existingObject.setRegionalTimeserial(entry.regionalTimeserial);
existingObject.setRegionalTimeserial(regionalTimeserialObj);
if (existingObject instanceof LiveCounter) {
existingObject.setCreated((entry as LiveCounterDataEntry).created);
}
Expand All @@ -195,17 +219,16 @@ export class LiveObjects {
const objectType = entry.objectType;
switch (objectType) {
case 'LiveCounter':
newObject = new LiveCounter(this, entry.created, entry.objectData, objectId);
newObject = new LiveCounter(this, entry.created, entry.objectData, objectId, regionalTimeserialObj);
break;

case 'LiveMap':
newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId);
newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId, regionalTimeserialObj);
break;

default:
throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 50000, 500);
}
newObject.setRegionalTimeserial(entry.regionalTimeserial);

this._liveObjectsPool.set(objectId, newObject);
}
Expand Down
Loading

0 comments on commit 82f1271

Please sign in to comment.