Skip to content

Commit 176c863

Browse files
authored
Merge pull request #1897 from ably/liveobjects/apply-incoming-operations
[DTP-954, DTP-956] Add support for applying incoming state operations outside of STATE_SYNC sequence
2 parents e1e6cc7 + b988c35 commit 176c863

File tree

13 files changed

+1331
-101
lines changed

13 files changed

+1331
-101
lines changed

scripts/moduleReport.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { gzip } from 'zlib';
66
import Table from 'cli-table';
77

88
// The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel)
9-
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 100, gzip: 31 };
9+
const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 101, gzip: 31 };
1010

1111
const baseClientNames = ['BaseRest', 'BaseRealtime'];
1212

@@ -310,12 +310,15 @@ async function checkLiveObjectsPluginFiles() {
310310
// These are the files that are allowed to contribute >= `threshold` bytes to the LiveObjects bundle.
311311
const allowedFiles = new Set([
312312
'src/plugins/liveobjects/index.ts',
313+
'src/plugins/liveobjects/livecounter.ts',
313314
'src/plugins/liveobjects/livemap.ts',
314315
'src/plugins/liveobjects/liveobject.ts',
315316
'src/plugins/liveobjects/liveobjects.ts',
316317
'src/plugins/liveobjects/liveobjectspool.ts',
318+
'src/plugins/liveobjects/objectid.ts',
317319
'src/plugins/liveobjects/statemessage.ts',
318320
'src/plugins/liveobjects/syncliveobjectsdatapool.ts',
321+
'src/plugins/liveobjects/timeserial.ts',
319322
]);
320323

321324
return checkBundleFiles(pluginBundleInfo, allowedFiles, 100);

src/common/lib/client/realtimechannel.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -621,6 +621,39 @@ class RealtimeChannel extends EventEmitter {
621621
break;
622622
}
623623

624+
case actions.STATE: {
625+
if (!this._liveObjects) {
626+
return;
627+
}
628+
629+
const { id, connectionId, timestamp } = message;
630+
const options = this.channelOptions;
631+
632+
const stateMessages = message.state ?? [];
633+
for (let i = 0; i < stateMessages.length; i++) {
634+
try {
635+
const stateMessage = stateMessages[i];
636+
637+
await this.client._LiveObjectsPlugin?.StateMessage.decode(stateMessage, options, decodeData);
638+
639+
if (!stateMessage.connectionId) stateMessage.connectionId = connectionId;
640+
if (!stateMessage.timestamp) stateMessage.timestamp = timestamp;
641+
if (!stateMessage.id) stateMessage.id = id + ':' + i;
642+
} catch (e) {
643+
Logger.logAction(
644+
this.logger,
645+
Logger.LOG_ERROR,
646+
'RealtimeChannel.processMessage()',
647+
(e as Error).toString(),
648+
);
649+
}
650+
}
651+
652+
this._liveObjects.handleStateMessages(stateMessages);
653+
654+
break;
655+
}
656+
624657
case actions.STATE_SYNC: {
625658
if (!this._liveObjects) {
626659
return;
@@ -649,7 +682,7 @@ class RealtimeChannel extends EventEmitter {
649682
}
650683
}
651684

652-
this._liveObjects.handleStateSyncMessage(stateMessages, message.channelSerial);
685+
this._liveObjects.handleStateSyncMessages(stateMessages, message.channelSerial);
653686

654687
break;
655688
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,121 @@
11
import { LiveObject, LiveObjectData } from './liveobject';
2+
import { LiveObjects } from './liveobjects';
3+
import { StateCounter, StateCounterOp, StateOperation, StateOperationAction } from './statemessage';
24

35
export interface LiveCounterData extends LiveObjectData {
46
data: number;
57
}
68

79
export class LiveCounter extends LiveObject<LiveCounterData> {
10+
constructor(
11+
liveObjects: LiveObjects,
12+
private _created: boolean,
13+
initialData?: LiveCounterData | null,
14+
objectId?: string,
15+
) {
16+
super(liveObjects, initialData, objectId);
17+
}
18+
19+
/**
20+
* Returns a {@link LiveCounter} instance with a 0 value.
21+
*
22+
* @internal
23+
*/
24+
static zeroValue(liveobjects: LiveObjects, isCreated: boolean, objectId?: string): LiveCounter {
25+
return new LiveCounter(liveobjects, isCreated, null, objectId);
26+
}
27+
828
value(): number {
929
return this._dataRef.data;
1030
}
1131

32+
/**
33+
* @internal
34+
*/
35+
isCreated(): boolean {
36+
return this._created;
37+
}
38+
39+
/**
40+
* @internal
41+
*/
42+
setCreated(created: boolean): void {
43+
this._created = created;
44+
}
45+
46+
/**
47+
* @internal
48+
*/
49+
applyOperation(op: StateOperation): void {
50+
if (op.objectId !== this.getObjectId()) {
51+
throw new this._client.ErrorInfo(
52+
`Cannot apply state operation with objectId=${op.objectId}, to this LiveCounter with objectId=${this.getObjectId()}`,
53+
50000,
54+
500,
55+
);
56+
}
57+
58+
switch (op.action) {
59+
case StateOperationAction.COUNTER_CREATE:
60+
this._applyCounterCreate(op.counter);
61+
break;
62+
63+
case StateOperationAction.COUNTER_INC:
64+
if (this._client.Utils.isNil(op.counterOp)) {
65+
this._throwNoPayloadError(op);
66+
} else {
67+
this._applyCounterInc(op.counterOp);
68+
}
69+
break;
70+
71+
default:
72+
throw new this._client.ErrorInfo(
73+
`Invalid ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
74+
50000,
75+
500,
76+
);
77+
}
78+
}
79+
1280
protected _getZeroValueData(): LiveCounterData {
1381
return { data: 0 };
1482
}
83+
84+
private _throwNoPayloadError(op: StateOperation): void {
85+
throw new this._client.ErrorInfo(
86+
`No payload found for ${op.action} op for LiveCounter objectId=${this.getObjectId()}`,
87+
50000,
88+
500,
89+
);
90+
}
91+
92+
private _applyCounterCreate(op: StateCounter | undefined): void {
93+
if (this.isCreated()) {
94+
// skip COUNTER_CREATE op if this counter is already created
95+
this._client.Logger.logAction(
96+
this._client.logger,
97+
this._client.Logger.LOG_MICRO,
98+
'LiveCounter._applyCounterCreate()',
99+
`skipping applying COUNTER_CREATE op on a counter instance as it is already created; objectId=${this._objectId}`,
100+
);
101+
return;
102+
}
103+
104+
if (this._client.Utils.isNil(op)) {
105+
// if a counter object is missing for the COUNTER_CREATE op, the initial value is implicitly 0 in this case.
106+
// we need to SUM the initial value to the current value due to the reasons below, but since it's a 0, we can skip addition operation
107+
this.setCreated(true);
108+
return;
109+
}
110+
111+
// note that it is intentional to SUM the incoming count from the create op.
112+
// if we get here, it means that current counter instance wasn't initialized from the COUNTER_CREATE op,
113+
// so it is missing the initial value that we're going to add now.
114+
this._dataRef.data += op.count ?? 0;
115+
this.setCreated(true);
116+
}
117+
118+
private _applyCounterInc(op: StateCounterOp): void {
119+
this._dataRef.data += op.amount;
120+
}
15121
}

0 commit comments

Comments
 (0)