Skip to content

Commit

Permalink
Merge pull request #1908 from ably/liveobjects/timeserial-fix
Browse files Browse the repository at this point in the history
Timeserial fixes for LiveObjects
  • Loading branch information
VeskeR authored Nov 8, 2024
2 parents 176c863 + 65c3f0f commit 385e1c8
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 33 deletions.
34 changes: 18 additions & 16 deletions src/plugins/liveobjects/livemap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ export class LiveMap extends LiveObject<LiveMapData> {

const liveDataEntry: MapEntry = {
...entry,
timeserial: DefaultTimeserial.calculateTimeserial(client, entry.timeserial),
timeserial: entry.timeserial
? DefaultTimeserial.calculateTimeserial(client, entry.timeserial)
: DefaultTimeserial.zeroValueTimeserial(client),
// true only if we received explicit true. otherwise always false
tombstone: entry.tombstone === true,
data: liveData,
Expand Down Expand Up @@ -150,15 +152,15 @@ export class LiveMap extends LiveObject<LiveMapData> {
if (this._client.Utils.isNil(op.mapOp)) {
this._throwNoPayloadError(op);
} else {
this._applyMapSet(op.mapOp, msg.serial);
this._applyMapSet(op.mapOp, DefaultTimeserial.calculateTimeserial(this._client, msg.serial));
}
break;

case StateOperationAction.MAP_REMOVE:
if (this._client.Utils.isNil(op.mapOp)) {
this._throwNoPayloadError(op);
} else {
this._applyMapRemove(op.mapOp, msg.serial);
this._applyMapRemove(op.mapOp, DefaultTimeserial.calculateTimeserial(this._client, msg.serial));
}
break;

Expand Down Expand Up @@ -202,7 +204,9 @@ export class LiveMap extends LiveObject<LiveMapData> {
// we can do this by iterating over entries from MAP_CREATE op and apply changes on per-key basis as if we had MAP_SET, MAP_REMOVE operations.
Object.entries(op.entries ?? {}).forEach(([key, entry]) => {
// for MAP_CREATE op we must use dedicated timeserial field available on an entry, instead of a timeserial on a message
const opOriginTimeserial = entry.timeserial;
const opOriginTimeserial = entry.timeserial
? DefaultTimeserial.calculateTimeserial(this._client, entry.timeserial)
: DefaultTimeserial.zeroValueTimeserial(this._client);
if (entry.tombstone === true) {
// entry in MAP_CREATE op is deleted, try to apply MAP_REMOVE op
this._applyMapRemove({ key }, opOriginTimeserial);
Expand All @@ -213,18 +217,17 @@ export class LiveMap extends LiveObject<LiveMapData> {
});
}

private _applyMapSet(op: StateMapOp, opOriginTimeserialStr: string | undefined): void {
private _applyMapSet(op: StateMapOp, opOriginTimeserial: Timeserial): void {
const { ErrorInfo, Utils } = this._client;

const opTimeserial = DefaultTimeserial.calculateTimeserial(this._client, opOriginTimeserialStr);
const existingEntry = this._dataRef.data.get(op.key);
if (existingEntry && opTimeserial.before(existingEntry.timeserial)) {
if (existingEntry && opOriginTimeserial.before(existingEntry.timeserial)) {
// the operation's origin timeserial < the entry's timeserial, ignore the operation.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapSet()',
`skipping updating key="${op.key}" as existing key entry has greater timeserial: ${existingEntry.timeserial.toString()}, than the op: ${opOriginTimeserialStr}; objectId=${this._objectId}`,
`skipping updating key="${op.key}" as existing key entry has greater timeserial: ${existingEntry.timeserial.toString()}, than the op: ${opOriginTimeserial.toString()}; objectId=${this._objectId}`,
);
return;
}
Expand All @@ -251,40 +254,39 @@ export class LiveMap extends LiveObject<LiveMapData> {

if (existingEntry) {
existingEntry.tombstone = false;
existingEntry.timeserial = opTimeserial;
existingEntry.timeserial = opOriginTimeserial;
existingEntry.data = liveData;
} else {
const newEntry: MapEntry = {
tombstone: false,
timeserial: opTimeserial,
timeserial: opOriginTimeserial,
data: liveData,
};
this._dataRef.data.set(op.key, newEntry);
}
}

private _applyMapRemove(op: StateMapOp, opOriginTimeserialStr: string | undefined): void {
const opTimeserial = DefaultTimeserial.calculateTimeserial(this._client, opOriginTimeserialStr);
private _applyMapRemove(op: StateMapOp, opOriginTimeserial: Timeserial): void {
const existingEntry = this._dataRef.data.get(op.key);
if (existingEntry && opTimeserial.before(existingEntry.timeserial)) {
if (existingEntry && opOriginTimeserial.before(existingEntry.timeserial)) {
// the operation's origin timeserial < the entry's timeserial, ignore the operation.
this._client.Logger.logAction(
this._client.logger,
this._client.Logger.LOG_MICRO,
'LiveMap._applyMapRemove()',
`skipping removing key="${op.key}" as existing key entry has greater timeserial: ${existingEntry.timeserial.toString()}, than the op: ${opOriginTimeserialStr}; objectId=${this._objectId}`,
`skipping removing key="${op.key}" as existing key entry has greater timeserial: ${existingEntry.timeserial.toString()}, than the op: ${opOriginTimeserial.toString()}; objectId=${this._objectId}`,
);
return;
}

if (existingEntry) {
existingEntry.tombstone = true;
existingEntry.timeserial = opTimeserial;
existingEntry.timeserial = opOriginTimeserial;
existingEntry.data = undefined;
} else {
const newEntry: MapEntry = {
tombstone: true,
timeserial: opTimeserial,
timeserial: opOriginTimeserial,
data: undefined,
};
this._dataRef.data.set(op.key, newEntry);
Expand Down
9 changes: 7 additions & 2 deletions src/plugins/liveobjects/statemessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,13 @@ export interface StateCounterOp {
export interface StateMapEntry {
/** Indicates whether the map entry has been removed. */
tombstone?: boolean;
/** The *origin* timeserial of the last operation that was applied to the map entry. */
timeserial: string;
/**
* The *origin* timeserial of the last operation that was applied to the map entry.
*
* It is optional in a MAP_CREATE operation and might be missing, in which case the client should default to using zero-value timeserial,
* which is the "earliest possible" timeserial. This will allow any other operation to update the field based on a timeserial comparison.
*/
timeserial?: string;
/** The data that represents the value of the map entry. */
data: StateData;
}
Expand Down
33 changes: 18 additions & 15 deletions src/plugins/liveobjects/timeserial.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ export class DefaultTimeserial implements Timeserial {
}

const [seriesId, rest] = timeserial.split('@');
if (!seriesId || !rest) {
if (!rest) {
throw new client.ErrorInfo(`Invalid timeserial: ${timeserial}`, 50000, 500);
}

Expand All @@ -101,6 +101,15 @@ export class DefaultTimeserial implements Timeserial {
);
}

/**
* Returns a zero-value Timeserial `@0-0` - "earliest possible" timeserial.
*
* @returns The timeserial object.
*/
static zeroValueTimeserial(client: BaseClient): Timeserial {
return new DefaultTimeserial(client, '', 0, 0); // @0-0
}

/**
* Compares this timeserial to the supplied timeserial, returning a number indicating their relative order.
* @param timeserialToCompare The timeserial to compare against. Can be a string or a Timeserial object.
Expand All @@ -125,20 +134,14 @@ export class DefaultTimeserial implements Timeserial {
return counterDiff;
}

// Compare the seriesId
// An empty seriesId is considered less than a non-empty one
if (!this.seriesId && secondTimeserial.seriesId) {
return -1;
}
if (this.seriesId && !secondTimeserial.seriesId) {
return 1;
}
// Otherwise compare seriesId lexicographically
const seriesIdDiff =
this.seriesId === secondTimeserial.seriesId ? 0 : this.seriesId < secondTimeserial.seriesId ? -1 : 1;

if (seriesIdDiff) {
return seriesIdDiff;
// Compare the seriesId lexicographically, but only if both seriesId exist
const seriesComparison =
this.seriesId &&
secondTimeserial.seriesId &&
this.seriesId !== secondTimeserial.seriesId &&
(this.seriesId > secondTimeserial.seriesId ? 1 : -1);
if (seriesComparison) {
return seriesComparison;
}

// Compare the index, if present
Expand Down

0 comments on commit 385e1c8

Please sign in to comment.