Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-1076, DTP-1077] Handle isolated create ops and site timeserials vector on the StateObject #1924

2 changes: 1 addition & 1 deletion src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ class RealtimeChannel extends EventEmitter {
}
}

this._liveObjects.handleStateMessages(stateMessages, message.channelSerial);
this._liveObjects.handleStateMessages(stateMessages);

break;
}
Expand Down
145 changes: 92 additions & 53 deletions src/plugins/liveobjects/livecounter.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { LiveObject, LiveObjectData, LiveObjectUpdate, LiveObjectUpdateNoop } from './liveobject';
import { LiveObjects } from './liveobjects';
import { StateCounter, StateCounterOp, StateMessage, StateOperation, StateOperationAction } from './statemessage';
import { Timeserial } from './timeserial';
import { StateCounterOp, StateMessage, StateObject, StateOperation, StateOperationAction } from './statemessage';
import { DefaultTimeserial } from './timeserial';

export interface LiveCounterData extends LiveObjectData {
data: number;
Expand All @@ -12,52 +12,35 @@ export interface LiveCounterUpdate extends LiveObjectUpdate {
}

export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate> {
constructor(
liveObjects: LiveObjects,
private _created: boolean,
initialData?: LiveCounterData | null,
objectId?: string,
regionalTimeserial?: Timeserial,
) {
super(liveObjects, initialData, objectId, regionalTimeserial);
}

/**
* Returns a {@link LiveCounter} instance with a 0 value.
*
* @internal
*/
static zeroValue(
liveobjects: LiveObjects,
isCreated: boolean,
objectId?: string,
regionalTimeserial?: Timeserial,
): LiveCounter {
return new LiveCounter(liveobjects, isCreated, null, objectId, regionalTimeserial);
}

value(): number {
return this._dataRef.data;
static zeroValue(liveobjects: LiveObjects, objectId: string): LiveCounter {
return new LiveCounter(liveobjects, objectId);
}

/**
* Returns a {@link LiveCounter} instance based on the provided state object.
* The provided state object must hold a valid counter object data.
*
* @internal
*/
isCreated(): boolean {
return this._created;
static fromStateObject(liveobjects: LiveObjects, stateObject: StateObject): LiveCounter {
const obj = new LiveCounter(liveobjects, stateObject.objectId);
obj.overrideWithStateObject(stateObject);
return obj;
}

/**
* @internal
*/
setCreated(created: boolean): void {
this._created = created;
value(): number {
return this._dataRef.data;
}

/**
* @internal
*/
applyOperation(op: StateOperation, msg: StateMessage, opRegionalTimeserial: Timeserial): void {
applyOperation(op: StateOperation, msg: StateMessage): void {
if (op.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Cannot apply state operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`,
Expand All @@ -66,10 +49,24 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
);
}

const opOriginTimeserial = DefaultTimeserial.calculateTimeserial(this._client, msg.serial);
if (!this._canApplyOperation(opOriginTimeserial)) {
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter.applyOperation()',
`skipping ${op.action} op: op timeserial ${opOriginTimeserial.toString()} <= site timeserial ${this._siteTimeserials[opOriginTimeserial.siteCode].toString()}; objectId=${this._objectId}`,
);
return;
}
// should update stored site timeserial immediately. doesn't matter if we successfully apply the op,
// as it's important to mark that the op was processed by the object
this._siteTimeserials[opOriginTimeserial.siteCode] = opOriginTimeserial;

let update: LiveCounterUpdate | LiveObjectUpdateNoop;
switch (op.action) {
case StateOperationAction.COUNTER_CREATE:
update = this._applyCounterCreate(op.counter);
update = this._applyCounterCreate(op);
break;

case StateOperationAction.COUNTER_INC:
Expand All @@ -90,19 +87,72 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
);
}

this.setRegionalTimeserial(opRegionalTimeserial);
this.notifyUpdated(update);
}

/**
* @internal
*/
overrideWithStateObject(stateObject: StateObject): LiveCounterUpdate {
if (stateObject.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid state object: state object objectId=${stateObject.objectId}; LiveCounter objectId=${this.getObjectId()}`,
50000,
500,
);
}

if (!this._client.Utils.isNil(stateObject.createOp)) {
// it is expected that create operation can be missing in the state object, so only validate it when it exists
if (stateObject.createOp.objectId !== this.getObjectId()) {
throw new this._client.ErrorInfo(
`Invalid state object: state object createOp objectId=${stateObject.createOp?.objectId}; LiveCounter objectId=${this.getObjectId()}`,
50000,
500,
);
}

if (stateObject.createOp.action !== StateOperationAction.COUNTER_CREATE) {
throw new this._client.ErrorInfo(
`Invalid state object: state object createOp action=${stateObject.createOp?.action}; LiveCounter objectId=${this.getObjectId()}`,
50000,
500,
);
}
}

const previousDataRef = this._dataRef;
// override all relevant data for this object with data from the state object
this._createOperationIsMerged = false;
this._dataRef = { data: stateObject.counter?.count ?? 0 };
this._siteTimeserials = this._timeserialMapFromStringMap(stateObject.siteTimeserials);
if (!this._client.Utils.isNil(stateObject.createOp)) {
this._mergeInitialDataFromCreateOperation(stateObject.createOp);
}

return this._updateFromDataDiff(previousDataRef, this._dataRef);
}

protected _getZeroValueData(): LiveCounterData {
return { data: 0 };
}

protected _updateFromDataDiff(currentDataRef: LiveCounterData, newDataRef: LiveCounterData): LiveCounterUpdate {
const counterDiff = newDataRef.data - currentDataRef.data;
protected _updateFromDataDiff(prevDataRef: LiveCounterData, newDataRef: LiveCounterData): LiveCounterUpdate {
const counterDiff = newDataRef.data - prevDataRef.data;
return { update: { inc: counterDiff } };
}

protected _mergeInitialDataFromCreateOperation(stateOperation: StateOperation): LiveCounterUpdate {
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case.
// note that it is intentional to SUM the incoming count from the create op.
// if we got here, it means that current counter instance is missing the initial value in its data reference,
// which we're going to add now.
this._dataRef.data += stateOperation.counter?.count ?? 0;
this._createOperationIsMerged = true;
VeskeR marked this conversation as resolved.
Show resolved Hide resolved

return { update: { inc: stateOperation.counter?.count ?? 0 } };
}

private _throwNoPayloadError(op: StateOperation): void {
throw new this._client.ErrorInfo(
`No payload found for ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
Expand All @@ -111,32 +161,21 @@ export class LiveCounter extends LiveObject<LiveCounterData, LiveCounterUpdate>
);
}

private _applyCounterCreate(op: StateCounter | undefined): LiveCounterUpdate | LiveObjectUpdateNoop {
if (this.isCreated()) {
// skip COUNTER_CREATE op if this counter is already created
private _applyCounterCreate(op: StateOperation): LiveCounterUpdate | LiveObjectUpdateNoop {
if (this._createOperationIsMerged) {
// There can't be two different create operation for the same object id, because the object id
// fully encodes that operation. This means we can safely ignore any new incoming create operations
// if we already merged it once.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveCounter._applyCounterCreate()',
`skipping applying COUNTER_CREATE op on a counter instance as it is already created; objectId=${this._objectId}`,
`skipping applying COUNTER_CREATE op on a counter instance as it was already applied before; objectId=${this._objectId}`,
);
return { noop: true };
}

if (this._client.Utils.isNil(op)) {
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case.
// we need to SUM the initial value to the current value due to the reasons below, but since it's a 0, we can skip addition operation
this.setCreated(true);
return { update: { inc: 0 } };
}

// note that it is intentional to SUM the incoming count from the create op.
// if we get here, it means that current counter instance wasn't initialized from the COUNTER_CREATE op,
// so it is missing the initial value that we're going to add now.
this._dataRef.data += op.count ?? 0;
this.setCreated(true);

return { update: { inc: op.count ?? 0 } };
return this._mergeInitialDataFromCreateOperation(op);
}

private _applyCounterInc(op: StateCounterOp): LiveCounterUpdate {
Expand Down
Loading
Loading