Skip to content

Commit

Permalink
Handle regional timeserials for LiveObjects
Browse files Browse the repository at this point in the history
Regional timeserial for a LiveObject:
- is set to StateObject.regionalTimeserial when object is created
during SYNC sequence
- is set to message's channelSerial property when object is
created via a state operation message
- is updated to message's channelSerial property when an operation is
applied on an object via a state operation message
- is equal to zero-value Timeserial (`@0-0`) when creating a zero-value
object
  • Loading branch information
VeskeR committed Oct 30, 2024
1 parent 65c3f0f commit 7ccfe50
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ class RealtimeChannel extends EventEmitter {
}
}

this._liveObjects.handleStateMessages(stateMessages);
this._liveObjects.handleStateMessages(stateMessages, message.channelSerial);

break;
}
Expand Down
19 changes: 14 additions & 5 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { LiveObject, LiveObjectData } from './liveobject';
import { LiveObjects } from './liveobjects';
import { StateCounter, StateCounterOp, StateOperation, StateOperationAction } from './statemessage';
import { StateCounter, StateCounterOp, StateMessage, StateOperation, StateOperationAction } from './statemessage';
import { Timeserial } from './timeserial';

export interface LiveCounterData extends LiveObjectData {
data: number;
Expand All @@ -12,17 +13,23 @@ export class LiveCounter extends LiveObject<LiveCounterData> {
private _created: boolean,
initialData?: LiveCounterData | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
super(liveObjects, initialData, objectId);
super(liveObjects, initialData, objectId, regionalTimeserial);
}

/**
* Returns a {@link LiveCounter} instance with a 0 value.
*
* @internal
*/
static zeroValue(liveobjects: LiveObjects, isCreated: boolean, objectId?: string): LiveCounter {
return new LiveCounter(liveobjects, isCreated, null, objectId);
static zeroValue(
liveobjects: LiveObjects,
isCreated: boolean,
objectId?: string,
regionalTimeserial?: Timeserial,
): LiveCounter {
return new LiveCounter(liveobjects, isCreated, null, objectId, regionalTimeserial);
}

value(): number {
Expand All @@ -46,7 +53,7 @@ export class LiveCounter extends LiveObject<LiveCounterData> {
/**
* @internal
*/
applyOperation(op: StateOperation): void {
applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply state operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -75,6 +82,8 @@ export class LiveCounter extends LiveObject<LiveCounterData> {
500,
);
}

this.setRegionalTimeserial(opRegionalTimeserial);
}

protected _getZeroValueData(): LiveCounterData {
Expand Down
11 changes: 7 additions & 4 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,18 @@ export class LiveMap extends LiveObject<LiveMapData> {
private _semantics: MapSemantics,
initialData?: LiveMapData | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
super(liveObjects, initialData, objectId);
super(liveObjects, initialData, objectId, regionalTimeserial);
}

/**
* Returns a {@link LiveMap} instance with an empty map data.
*
* @internal
*/
static zeroValue(liveobjects: LiveObjects, objectId?: string): LiveMap {
return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId);
static zeroValue(liveobjects: LiveObjects, objectId?: string, regionalTimeserial?: Timeserial): LiveMap {
return new LiveMap(liveobjects, MapSemantics.LWW, null, objectId, regionalTimeserial);
}

static liveMapDataFromMapEntries(client: BaseClient, entries: Record<string, StateMapEntry>): LiveMapData {
Expand Down Expand Up @@ -134,7 +135,7 @@ export class LiveMap extends LiveObject<LiveMapData> {
/**
* @internal
*/
applyOperation(op: StateOperation, msg: StateMessage): void {
applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply state operation with objectId=${op.objectId}, to this LiveMap with objectId=${this.getObjectId()}`,
Expand Down Expand Up @@ -171,6 +172,8 @@ export class LiveMap extends LiveObject<LiveMapData> {
500,
);
}

this.setRegionalTimeserial(opRegionalTimeserial);
}

protected _getZeroValueData(): LiveMapData {
Expand Down
12 changes: 8 additions & 4 deletions src/plugins/liveobjects/liveobject.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type BaseClient from 'common/lib/client/baseclient';
import { LiveObjects } from './liveobjects';
import { StateMessage, StateOperation } from './statemessage';
import { DefaultTimeserial, Timeserial } from './timeserial';

export interface LiveObjectData {
data: any;
Expand All @@ -10,16 +11,19 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
protected _client: BaseClient;
protected _dataRef: T;
protected _objectId: string;
protected _regionalTimeserial?: string;
protected _regionalTimeserial: Timeserial;

constructor(
protected _liveObjects: LiveObjects,
initialData?: T | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
this._client = this._liveObjects.getClient();
this._dataRef = initialData ?? this._getZeroValueData();
this._objectId = objectId ?? this._createObjectId();
// use zero value timeserial by default, so any future operation can be applied for this object
this._regionalTimeserial = regionalTimeserial ?? DefaultTimeserial.zeroValueTimeserial(this._client);
}

/**
Expand All @@ -32,7 +36,7 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
/**
* @internal
*/
getRegionalTimeserial(): string | undefined {
getRegionalTimeserial(): Timeserial {
return this._regionalTimeserial;
}

Expand All @@ -46,7 +50,7 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
/**
* @internal
*/
setRegionalTimeserial(regionalTimeserial: string): void {
setRegionalTimeserial(regionalTimeserial: Timeserial): void {
this._regionalTimeserial = regionalTimeserial;
}

Expand All @@ -58,6 +62,6 @@ export abstract class LiveObject<T extends LiveObjectData = LiveObjectData> {
/**
* @internal
*/
abstract applyOperation(op: StateOperation, msg: StateMessage): void;
abstract applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void;
protected abstract _getZeroValueData(): T;
}
14 changes: 8 additions & 6 deletions src/plugins/liveobjects/liveobjects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { LiveObject } from './liveobject';
import { LiveObjectsPool, ROOT_OBJECT_ID } from './liveobjectspool';
import { StateMessage } from './statemessage';
import { LiveCounterDataEntry, SyncLiveObjectsDataPool } from './syncliveobjectsdatapool';
import { DefaultTimeserial } from './timeserial';

enum LiveObjectsEvents {
SyncCompleted = 'SyncCompleted',
Expand Down Expand Up @@ -84,12 +85,13 @@ export class LiveObjects {
/**
* @internal
*/
handleStateMessages(stateMessages: StateMessage[]): void {
handleStateMessages(stateMessages: StateMessage[], msgRegionalTimeserial: string | null | undefined): void {
const timeserial = DefaultTimeserial.calculateTimeserial(this._client, msgRegionalTimeserial);
if (this._syncInProgress) {
// TODO: handle buffering of state messages during SYNC
}

this._liveObjectsPool.applyStateMessages(stateMessages);
this._liveObjectsPool.applyStateMessages(stateMessages, timeserial);
}

/**
Expand Down Expand Up @@ -180,10 +182,11 @@ export class LiveObjects {
for (const [objectId, entry] of this._syncLiveObjectsDataPool.entries()) {
receivedObjectIds.add(objectId);
const existingObject = this._liveObjectsPool.get(objectId);
const regionalTimeserialObj = DefaultTimeserial.calculateTimeserial(this._client, entry.regionalTimeserial);

if (existingObject) {
existingObject.setData(entry.objectData);
existingObject.setRegionalTimeserial(entry.regionalTimeserial);
existingObject.setRegionalTimeserial(regionalTimeserialObj);
if (existingObject instanceof LiveCounter) {
existingObject.setCreated((entry as LiveCounterDataEntry).created);
}
Expand All @@ -195,17 +198,16 @@ export class LiveObjects {
const objectType = entry.objectType;
switch (objectType) {
case 'LiveCounter':
newObject = new LiveCounter(this, entry.created, entry.objectData, objectId);
newObject = new LiveCounter(this, entry.created, entry.objectData, objectId, regionalTimeserialObj);
break;

case 'LiveMap':
newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId);
newObject = new LiveMap(this, entry.semantics, entry.objectData, objectId, regionalTimeserialObj);
break;

default:
throw new this._client.ErrorInfo(`Unknown live object type: ${objectType}`, 50000, 500);
}
newObject.setRegionalTimeserial(entry.regionalTimeserial);

this._liveObjectsPool.set(objectId, newObject);
}
Expand Down
27 changes: 16 additions & 11 deletions src/plugins/liveobjects/liveobjectspool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { LiveObject } from './liveobject';
import { LiveObjects } from './liveobjects';
import { ObjectId } from './objectid';
import { MapSemantics, StateMessage, StateOperation, StateOperationAction } from './statemessage';
import { DefaultTimeserial, Timeserial } from './timeserial';

export const ROOT_OBJECT_ID = 'root';

Expand Down Expand Up @@ -51,22 +52,24 @@ export class LiveObjectsPool {
}

const parsedObjectId = ObjectId.fromString(this._client, objectId);
// use zero value timeserial, so any operation can be applied for this object
const regionalTimeserial = DefaultTimeserial.zeroValueTimeserial(this._client);
let zeroValueObject: LiveObject;
switch (parsedObjectId.type) {
case 'map': {
zeroValueObject = LiveMap.zeroValue(this._liveObjects, objectId);
zeroValueObject = LiveMap.zeroValue(this._liveObjects, objectId, regionalTimeserial);
break;
}

case 'counter':
zeroValueObject = LiveCounter.zeroValue(this._liveObjects, false, objectId);
zeroValueObject = LiveCounter.zeroValue(this._liveObjects, false, objectId, regionalTimeserial);
break;
}

this.set(objectId, zeroValueObject);
}

applyStateMessages(stateMessages: StateMessage[]): void {
applyStateMessages(stateMessages: StateMessage[], regionalTimeserial: Timeserial): void {
for (const stateMessage of stateMessages) {
if (!stateMessage.operation) {
this._client.Logger.logAction(
Expand All @@ -87,17 +90,17 @@ export class LiveObjectsPool {
// object wich such id already exists (we may have created a zero-value object before, or this is a duplicate *_CREATE op),
// so delegate application of the op to that object
// TODO: invoke subscription callbacks for an object when applied
this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage);
this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage, regionalTimeserial);
break;
}

// otherwise we can create new objects in the pool
if (stateOperation.action === StateOperationAction.MAP_CREATE) {
this._handleMapCreate(stateOperation);
this._handleMapCreate(stateOperation, regionalTimeserial);
}

if (stateOperation.action === StateOperationAction.COUNTER_CREATE) {
this._handleCounterCreate(stateOperation);
this._handleCounterCreate(stateOperation, regionalTimeserial);
}
break;

Expand All @@ -109,7 +112,7 @@ export class LiveObjectsPool {
// when we eventually receive a corresponding *_CREATE op for that object, its application will be handled by that zero-value object.
this.createZeroValueObjectIfNotExists(stateOperation.objectId);
// TODO: invoke subscription callbacks for an object when applied
this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage);
this.get(stateOperation.objectId)!.applyOperation(stateOperation, stateMessage, regionalTimeserial);
break;

default:
Expand All @@ -130,35 +133,37 @@ export class LiveObjectsPool {
return pool;
}

private _handleCounterCreate(stateOperation: StateOperation): void {
private _handleCounterCreate(stateOperation: StateOperation, opRegionalTimeserial: Timeserial): void {
let counter: LiveCounter;
if (this._client.Utils.isNil(stateOperation.counter)) {
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly a zero-value counter.
counter = LiveCounter.zeroValue(this._liveObjects, true, stateOperation.objectId);
counter = LiveCounter.zeroValue(this._liveObjects, true, stateOperation.objectId, opRegionalTimeserial);
} else {
counter = new LiveCounter(
this._liveObjects,
true,
{ data: stateOperation.counter.count ?? 0 },
stateOperation.objectId,
opRegionalTimeserial,
);
}

this.set(stateOperation.objectId, counter);
}

private _handleMapCreate(stateOperation: StateOperation): void {
private _handleMapCreate(stateOperation: StateOperation, opRegionalTimeserial: Timeserial): void {
let map: LiveMap;
if (this._client.Utils.isNil(stateOperation.map)) {
// if a map object is missing for the MAP_CREATE op, the initial value is implicitly a zero-value map.
map = LiveMap.zeroValue(this._liveObjects, stateOperation.objectId);
map = LiveMap.zeroValue(this._liveObjects, stateOperation.objectId, opRegionalTimeserial);
} else {
const objectData = LiveMap.liveMapDataFromMapEntries(this._client, stateOperation.map.entries ?? {});
map = new LiveMap(
this._liveObjects,
stateOperation.map.semantics ?? MapSemantics.LWW,
objectData,
stateOperation.objectId,
opRegionalTimeserial,
);
}

Expand Down

0 comments on commit 7ccfe50

Please sign in to comment.