Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
485 changes: 353 additions & 132 deletions ably.d.ts

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions scripts/moduleReport.ts
Original file line number Diff line number Diff line change
Expand Up @@ -328,8 +328,6 @@ async function checkObjectsPluginFiles() {
// These are the files that are allowed to contribute >= `threshold` bytes to the Objects bundle.
const allowedFiles = new Set([
'src/plugins/objects/batchcontext.ts',
'src/plugins/objects/batchcontextlivecounter.ts',
'src/plugins/objects/batchcontextlivemap.ts',
'src/plugins/objects/index.ts',
'src/plugins/objects/instance.ts',
'src/plugins/objects/livecounter.ts',
Expand All @@ -343,6 +341,7 @@ async function checkObjectsPluginFiles() {
'src/plugins/objects/pathobject.ts',
'src/plugins/objects/pathobjectsubscriptionregister.ts',
'src/plugins/objects/realtimeobject.ts',
'src/plugins/objects/rootbatchcontext.ts',
'src/plugins/objects/syncobjectsdatapool.ts',
]);

Expand Down
161 changes: 86 additions & 75 deletions src/plugins/objects/batchcontext.ts
Original file line number Diff line number Diff line change
@@ -1,107 +1,118 @@
import type BaseClient from 'common/lib/client/baseclient';
import type * as API from '../../../ably';
import { BatchContextLiveCounter } from './batchcontextlivecounter';
import { BatchContextLiveMap } from './batchcontextlivemap';
import { ROOT_OBJECT_ID } from './constants';
import type { AnyBatchContext, BatchContext, CompactedValue, Instance, Primitive, Value } from '../../../ably';
import { DefaultInstance } from './instance';
import { LiveCounter } from './livecounter';
import { LiveMap } from './livemap';
import { ObjectMessage } from './objectmessage';
import { RealtimeObject } from './realtimeobject';
import { RootBatchContext } from './rootbatchcontext';

export class BatchContext {
private _client: BaseClient;
/** Maps object ids to the corresponding batch context object wrappers */
private _wrappedObjects: Map<string, BatchContextLiveCounter | BatchContextLiveMap<API.LiveMapType>> = new Map();
private _queuedMessages: ObjectMessage[] = [];
private _isClosed = false;
export interface InstanceEvent {
/** Object message that caused this event */
message?: ObjectMessage;
}

export class DefaultBatchContext implements AnyBatchContext {
protected _client: BaseClient;

constructor(
private _realtimeObject: RealtimeObject,
private _root: LiveMap<API.LiveMapType>,
protected _realtimeObject: RealtimeObject,
protected _instance: Instance<Value>,
protected _rootContext: RootBatchContext,
) {
this._client = _realtimeObject.getClient();
this._wrappedObjects.set(this._root.getObjectId(), new BatchContextLiveMap(this, this._realtimeObject, this._root));
this._client = this._realtimeObject.getClient();
}

get<T extends API.LiveMapType = API.AblyDefaultObject>(): BatchContextLiveMap<T> {
this._realtimeObject.throwIfInvalidAccessApiConfiguration();
this.throwIfClosed();
return this.getWrappedObject(ROOT_OBJECT_ID) as BatchContextLiveMap<T>;
get<T extends Value = Value>(key: string): BatchContext<T> | undefined {
this._throwIfClosed();
const instance = this._instance.get(key);
if (!instance) {
return undefined;
}
return this._rootContext.wrapInstance(instance) as unknown as BatchContext<T>;
}

/**
* @internal
*/
getWrappedObject(objectId: string): BatchContextLiveCounter | BatchContextLiveMap<API.LiveMapType> | undefined {
if (this._wrappedObjects.has(objectId)) {
return this._wrappedObjects.get(objectId);
}
value<T extends Primitive = Primitive>(): T | undefined {
this._throwIfClosed();
return this._instance.value();
}

const originObject = this._realtimeObject.getPool().get(objectId);
if (!originObject) {
return undefined;
compact<T extends Value = Value>(): CompactedValue<T> | undefined {
this._throwIfClosed();
return this._instance.compact();
}

id(): string | undefined {
this._throwIfClosed();
return this._instance.id();
}

*entries<T extends Record<string, Value>>(): IterableIterator<[keyof T, BatchContext<T[keyof T]>]> {
this._throwIfClosed();
for (const [key, value] of this._instance.entries()) {
const ctx = this._rootContext.wrapInstance(value) as unknown as BatchContext<T[keyof T]>;
yield [key, ctx];
}
}

let wrappedObject: BatchContextLiveCounter | BatchContextLiveMap<API.LiveMapType>;
if (originObject instanceof LiveMap) {
wrappedObject = new BatchContextLiveMap(this, this._realtimeObject, originObject);
} else if (originObject instanceof LiveCounter) {
wrappedObject = new BatchContextLiveCounter(this, this._realtimeObject, originObject);
} else {
throw new this._client.ErrorInfo(
`Unknown LiveObject instance type: objectId=${originObject.getObjectId()}`,
50000,
500,
);
*keys<T extends Record<string, Value>>(): IterableIterator<keyof T> {
this._throwIfClosed();
yield* this._instance.keys();
}

*values<T extends Record<string, Value>>(): IterableIterator<BatchContext<T[keyof T]>> {
this._throwIfClosed();
for (const [_, value] of this.entries<T>()) {
yield value;
}
}

this._wrappedObjects.set(objectId, wrappedObject);
return wrappedObject;
size(): number | undefined {
this._throwIfClosed();
return this._instance.size();
}

/**
* @internal
*/
throwIfClosed(): void {
if (this.isClosed()) {
throw new this._client.ErrorInfo('Batch is closed', 40000, 400);
set(key: string, value: Value): void {
this._throwIfClosed();
if (!(this._instance as DefaultInstance<Value>).isLiveMap()) {
throw new this._client.ErrorInfo('Cannot set a key on a non-LiveMap instance', 92007, 400);
}
this._rootContext.queueMessages(async () =>
LiveMap.createMapSetMessage(this._realtimeObject, this._instance.id()!, key, value as Primitive),
);
}

/**
* @internal
*/
isClosed(): boolean {
return this._isClosed;
remove(key: string): void {
this._throwIfClosed();
if (!(this._instance as DefaultInstance<Value>).isLiveMap()) {
throw new this._client.ErrorInfo('Cannot remove a key from a non-LiveMap instance', 92007, 400);
}
this._rootContext.queueMessages(async () => [
LiveMap.createMapRemoveMessage(this._realtimeObject, this._instance.id()!, key),
]);
}

/**
* @internal
*/
close(): void {
this._isClosed = true;
increment(amount?: number): void {
this._throwIfClosed();
if (!(this._instance as DefaultInstance<Value>).isLiveCounter()) {
throw new this._client.ErrorInfo('Cannot increment a non-LiveCounter instance', 92007, 400);
}
this._rootContext.queueMessages(async () => [
LiveCounter.createCounterIncMessage(this._realtimeObject, this._instance.id()!, amount ?? 1),
]);
}

/**
* @internal
*/
queueMessage(msg: ObjectMessage): void {
this._queuedMessages.push(msg);
decrement(amount?: number): void {
this._throwIfClosed();
if (!(this._instance as DefaultInstance<Value>).isLiveCounter()) {
throw new this._client.ErrorInfo('Cannot decrement a non-LiveCounter instance', 92007, 400);
}
this.increment(-(amount ?? 1));
}

/**
* @internal
*/
async flush(): Promise<void> {
try {
this.close();

if (this._queuedMessages.length > 0) {
await this._realtimeObject.publish(this._queuedMessages);
}
} finally {
this._wrappedObjects.clear();
this._queuedMessages = [];
private _throwIfClosed(): void {
if (this._rootContext.isClosed()) {
throw new this._client.ErrorInfo('Batch is closed', 40000, 400);
}
}
}
41 changes: 0 additions & 41 deletions src/plugins/objects/batchcontextlivecounter.ts

This file was deleted.

62 changes: 0 additions & 62 deletions src/plugins/objects/batchcontextlivemap.ts

This file was deleted.

35 changes: 33 additions & 2 deletions src/plugins/objects/instance.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import type BaseClient from 'common/lib/client/baseclient';
import type {
AnyInstance,
BatchContext,
BatchFunction,
CompactedValue,
EventCallback,
Instance,
InstanceSubscriptionEvent,
LiveObject as LiveObjectType,
Primitive,
SubscribeResponse,
Value,
Expand All @@ -14,6 +17,7 @@ import { LiveMap } from './livemap';
import { LiveObject } from './liveobject';
import { ObjectMessage } from './objectmessage';
import { RealtimeObject } from './realtimeobject';
import { RootBatchContext } from './rootbatchcontext';

export interface InstanceEvent {
/** Object message that caused this event */
Expand Down Expand Up @@ -112,9 +116,12 @@ export class DefaultInstance<T extends Value> implements AnyInstance<T> {
}

*keys<U extends Record<string, Value>>(): IterableIterator<keyof U> {
for (const [key] of this.entries<U>()) {
yield key;
if (!(this._value instanceof LiveMap)) {
// return empty iterator for non-LiveMap objects
return;
}

yield* this._value.keys();
}

*values<U extends Record<string, Value>>(): IterableIterator<Instance<U[keyof U]>> {
Expand Down Expand Up @@ -183,4 +190,28 @@ export class DefaultInstance<T extends Value> implements AnyInstance<T> {
return unsubscribe;
});
}

async batch<T extends LiveObjectType = LiveObjectType>(fn: BatchFunction<T>): Promise<void> {
if (!(this._value instanceof LiveObject)) {
throw new this._client.ErrorInfo('Cannot batch operations on a non-LiveObject instance', 92007, 400);
}

const ctx = new RootBatchContext(this._realtimeObject, this);
try {
fn(ctx as unknown as BatchContext<T>);
await ctx.flush();
} finally {
ctx.close();
}
}

/** @internal */
public isLiveMap(): boolean {
return this._value instanceof LiveMap;
}

/** @internal */
public isLiveCounter(): boolean {
return this._value instanceof LiveCounter;
}
}
Loading
Loading