Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DTP-1033] Add LiveObjects edit API #1948

Open
wants to merge 7 commits into
base: integration/liveobjects
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/lib/client/baseclient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { HTTPRequestImplementations } from 'platform/web/lib/http/http';
import { FilteredSubscriptions } from './filteredsubscriptions';
import type { LocalDevice } from 'plugins/push/pushactivation';
import EventEmitter from '../util/eventemitter';
import { MessageEncoding } from '../types/message';

type BatchResult<T> = API.BatchResult<T>;
type BatchPublishSpec = API.BatchPublishSpec;
Expand Down Expand Up @@ -181,6 +182,7 @@ class BaseClient {
Defaults = Defaults;
Utils = Utils;
EventEmitter = EventEmitter;
MessageEncoding = MessageEncoding;
}

export default BaseClient;
15 changes: 13 additions & 2 deletions src/common/lib/client/realtimechannel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ import Message, {
fromValuesArray as messagesFromValuesArray,
encodeArray as encodeMessagesArray,
decode as decodeMessage,
decodeData,
getMessagesSize,
CipherOptions,
EncodingDecodingContext,
MessageEncoding,
} from '../types/message';
import ChannelStateChange from './channelstatechange';
import ErrorInfo, { PartialErrorInfo } from '../types/errorinfo';
Expand Down Expand Up @@ -511,6 +511,17 @@ class RealtimeChannel extends EventEmitter {
this.sendMessage(msg, callback);
}

sendState(state: StateMessage[]): Promise<void> {
return new Promise((resolve, reject) => {
const msg = protocolMessageFromValues({
action: actions.STATE,
channel: this.name,
state,
});
this.sendMessage(msg, (err) => (err ? reject(err) : resolve()));
});
}

// Access to this method is synchronised by ConnectionManager#processChannelMessage, in order to synchronise access to the state stored in _decodingContext.
async processMessage(message: ProtocolMessage): Promise<void> {
if (
Expand Down Expand Up @@ -615,7 +626,7 @@ class RealtimeChannel extends EventEmitter {
const options = this.channelOptions;
await this._decodeAndPrepareMessages(message, stateMessages, (msg) =>
this.client._LiveObjectsPlugin
? this.client._LiveObjectsPlugin.StateMessage.decode(msg, options, decodeData)
? this.client._LiveObjectsPlugin.StateMessage.decode(msg, options, MessageEncoding)
: Utils.throwMissingPluginError('LiveObjects'),
);

Expand Down
3 changes: 2 additions & 1 deletion src/common/lib/transport/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ export class PendingMessage {
this.merged = false;
const action = message.action;
this.sendAttempted = false;
this.ackRequired = action == actions.MESSAGE || action == actions.PRESENCE;
this.ackRequired =
typeof action === 'number' && [actions.MESSAGE, actions.PRESENCE, actions.STATE].includes(action);
}
}

Expand Down
177 changes: 130 additions & 47 deletions src/common/lib/types/message.ts
Original file line number Diff line number Diff line change
Expand Up @@ -131,41 +131,120 @@ export async function fromEncodedArray(
);
}

async function encrypt<T extends Message | PresenceMessage>(msg: T, options: CipherOptions): Promise<T> {
let data = msg.data,
encoding = msg.encoding,
cipher = options.channelCipher;

encoding = encoding ? encoding + '/' : '';
if (!Platform.BufferUtils.isBuffer(data)) {
data = Platform.BufferUtils.utf8Encode(String(data));
encoding = encoding + 'utf-8/';
}
const ciphertext = await cipher.encrypt(data);
msg.data = ciphertext;
msg.encoding = encoding + 'cipher+' + cipher.algorithm;
async function encrypt<T extends Message | PresenceMessage>(msg: T, cipherOptions: CipherOptions): Promise<T> {
const { data, encoding } = await encryptData(msg.data, msg.encoding, cipherOptions);
msg.data = data;
msg.encoding = encoding;
return msg;
}

export async function encode<T extends Message | PresenceMessage>(msg: T, options: CipherOptions): Promise<T> {
const data = msg.data;
export async function encryptData(
data: any,
encoding: string | null | undefined,
cipherOptions: CipherOptions,
): Promise<{ data: any; encoding: string | null | undefined }> {
let cipher = cipherOptions.channelCipher;
let dataToEncrypt = data;
let finalEncoding = encoding ? encoding + '/' : '';

if (!Platform.BufferUtils.isBuffer(dataToEncrypt)) {
dataToEncrypt = Platform.BufferUtils.utf8Encode(String(dataToEncrypt));
finalEncoding = finalEncoding + 'utf-8/';
}

const ciphertext = await cipher.encrypt(dataToEncrypt);
finalEncoding = finalEncoding + 'cipher+' + cipher.algorithm;

return {
data: ciphertext,
encoding: finalEncoding,
};
}

/**
* Protocol agnostic encoding and encryption of the message's payload. Mutates the message.
* Implements RSL4 (only parts that are common for all protocols), and RSL5.
*
* Since this encoding function is protocol agnostic, it won't apply the final encodings
* required by the protocol used by the client (like encoding binary data to the appropriate representation).
*/
export async function encode<T extends Message | PresenceMessage>(msg: T, cipherOptions: CipherOptions): Promise<T> {
const { data, encoding } = encodeData(msg.data, msg.encoding);
msg.data = data;
msg.encoding = encoding;

if (cipherOptions != null && cipherOptions.cipher) {
return encrypt(msg, cipherOptions);
} else {
return msg;
}
}

/**
* Protocol agnostic encoding of the provided payload data. Implements RSL4 (only parts that are common for all protocols).
*/
export function encodeData(
data: any,
encoding: string | null | undefined,
): { data: any; encoding: string | null | undefined } {
// RSL4a, supported types
const nativeDataType =
typeof data == 'string' || Platform.BufferUtils.isBuffer(data) || data === null || data === undefined;

if (!nativeDataType) {
if (Utils.isObject(data) || Array.isArray(data)) {
msg.data = JSON.stringify(data);
msg.encoding = msg.encoding ? msg.encoding + '/json' : 'json';
} else {
throw new ErrorInfo('Data type is unsupported', 40013, 400);
}
if (nativeDataType) {
// nothing to do with the native data types at this point
return {
data,
encoding,
};
}

if (options != null && options.cipher) {
return encrypt(msg, options);
} else {
return msg;
if (Utils.isObject(data) || Array.isArray(data)) {
// RSL4c3 and RSL4d3, encode objects and arrays as strings
return {
data: JSON.stringify(data),
encoding: encoding ? encoding + '/json' : 'json',
};
}

// RSL4a, throw an error for unsupported types
throw new ErrorInfo('Data type is unsupported', 40013, 400);
}

/**
* Prepares the payload data to be transmitted over the wire to Ably.
* Encodes the data depending on the selected protocol format.
*
* Implements RSL4c1 and RSL4d1
*/
export function encodeDataForWireProtocol(
data: any,
encoding: string | null | undefined,
format: Utils.Format,
): { data: any; encoding: string | null | undefined } {
if (!data || !Platform.BufferUtils.isBuffer(data)) {
// no transformation required for non-buffer payloads
return {
data,
encoding,
};
}

if (format === Utils.Format.msgpack) {
// RSL4c1
// BufferUtils.toBuffer returns a datatype understandable by that platform's msgpack implementation:
// Buffer in node, Uint8Array in browsers
return {
data: Platform.BufferUtils.toBuffer(data),
encoding,
};
}

// RSL4d1, encode binary payload as base64 string
return {
data: Platform.BufferUtils.base64Encode(data),
encoding: encoding ? encoding + '/base64' : 'base64',
};
}

export async function encodeArray(messages: Array<Message>, options: CipherOptions): Promise<Array<Message>> {
Expand All @@ -178,6 +257,8 @@ export async function decode(
message: Message | PresenceMessage,
inputContext: CipherOptions | EncodingDecodingContext | ChannelOptions,
): Promise<void> {
// data can be decoded partially and throw an error on a later decoding step.
// so we need to reassign the data and encoding values we got, and only then throw an error if there is one
const { data, encoding, error } = await decodeData(message.data, message.encoding, inputContext);
message.data = data;
VeskeR marked this conversation as resolved.
Show resolved Hide resolved
message.encoding = encoding;
Expand All @@ -187,6 +268,9 @@ export async function decode(
}
}

/**
* Implements RSL6
*/
export async function decodeData(
data: any,
encoding: string | null | undefined,
Expand All @@ -199,8 +283,8 @@ export async function decodeData(
const context = normaliseContext(inputContext);
let lastPayload = data;
let decodedData = data;
let finalEncoding: string | null | undefined = encoding;
let decodingError: ErrorInfo | undefined = undefined;
let finalEncoding = encoding;
let decodingError: ErrorInfo | undefined;

if (encoding) {
const xforms = encoding.split('/');
Expand Down Expand Up @@ -358,6 +442,13 @@ export function getMessagesSize(messages: Message[]): number {
return total;
}

export const MessageEncoding = {
encryptData,
encodeData,
encodeDataForWireProtocol,
decodeData,
};

class Message {
name?: string;
id?: string;
Expand All @@ -378,27 +469,19 @@ class Message {
operation?: API.Operation;

/**
* Overload toJSON() to intercept JSON.stringify()
* @return {*}
* Overload toJSON() to intercept JSON.stringify().
*
* This will prepare the message to be transmitted over the wire to Ably.
* It will encode the data payload according to the wire protocol used on the client.
* It will transform any client-side enum string representations into their corresponding numbers, if needed (like "action" fields).
*/
toJSON() {
/* encode data to base64 if present and we're returning real JSON;
* although msgpack calls toJSON(), we know it is a stringify()
* call if it has a non-empty arguments list */
let encoding = this.encoding;
let data = this.data;
if (data && Platform.BufferUtils.isBuffer(data)) {
if (arguments.length > 0) {
/* stringify call */
encoding = encoding ? encoding + '/base64' : 'base64';
data = Platform.BufferUtils.base64Encode(data);
} else {
/* Called by msgpack. toBuffer returns a datatype understandable by
* that platform's msgpack implementation (Buffer in node, Uint8Array
* in browsers) */
data = Platform.BufferUtils.toBuffer(data);
}
}
// we can infer the format used by client by inspecting with what arguments this method was called.
// if JSON protocol is being used, the JSON.stringify() will be called and this toJSON() method will have a non-empty arguments list.
// MSGPack protocol implementation also calls toJSON(), but with an empty arguments list.
const format = arguments.length > 0 ? Utils.Format.json : Utils.Format.msgpack;
const { data, encoding } = encodeDataForWireProtocol(this.data, this.encoding, format);

VeskeR marked this conversation as resolved.
Show resolved Hide resolved
return {
name: this.name,
id: this.id,
Expand Down
40 changes: 19 additions & 21 deletions src/common/lib/types/presencemessage.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import Logger from '../util/logger';
import Platform from 'common/platform';
import { encode as encodeMessage, decode as decodeMessage, getMessagesSize, CipherOptions } from './message';
import {
encode as encodeMessage,
decode as decodeMessage,
getMessagesSize,
CipherOptions,
encodeDataForWireProtocol,
} from './message';
import * as Utils from '../util/utils';
import * as API from '../../../../ably';
import { MsgPack } from 'common/types/msgpack';
Expand Down Expand Up @@ -128,8 +134,11 @@ class PresenceMessage {
}

/**
* Overload toJSON() to intercept JSON.stringify()
* @return {*}
* Overload toJSON() to intercept JSON.stringify().
*
* This will prepare the message to be transmitted over the wire to Ably.
* It will encode the data payload according to the wire protocol used on the client.
* It will transform any client-side enum string representations into their corresponding numbers, if needed (like "action" fields).
*/
toJSON(): {
id?: string;
Expand All @@ -139,30 +148,19 @@ class PresenceMessage {
encoding?: string;
extras?: any;
} {
/* encode data to base64 if present and we're returning real JSON;
* although msgpack calls toJSON(), we know it is a stringify()
* call if it has a non-empty arguments list */
let data = this.data as string | Buffer | Uint8Array;
let encoding = this.encoding;
if (data && Platform.BufferUtils.isBuffer(data)) {
if (arguments.length > 0) {
/* stringify call */
encoding = encoding ? encoding + '/base64' : 'base64';
data = Platform.BufferUtils.base64Encode(data);
} else {
/* Called by msgpack. toBuffer returns a datatype understandable by
* that platform's msgpack implementation (Buffer in node, Uint8Array
* in browsers) */
data = Platform.BufferUtils.toBuffer(data);
}
}
// we can infer the format used by client by inspecting with what arguments this method was called.
// if JSON protocol is being used, the JSON.stringify() will be called and this toJSON() method will have a non-empty arguments list.
// MSGPack protocol implementation also calls toJSON(), but with an empty arguments list.
const format = arguments.length > 0 ? Utils.Format.json : Utils.Format.msgpack;
const { data, encoding } = encodeDataForWireProtocol(this.data, this.encoding, format);

return {
id: this.id,
clientId: this.clientId,
/* Convert presence action back to an int for sending to Ably */
action: toActionValue(this.action as string),
data: data,
encoding: encoding,
encoding: encoding!,
extras: this.extras,
};
}
Expand Down
12 changes: 8 additions & 4 deletions src/common/lib/types/protocolmessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@ import * as API from '../../../../ably';
import { PresenceMessagePlugin } from '../client/modularplugins';
import * as Utils from '../util/utils';
import ErrorInfo from './errorinfo';
import Message, { fromValues as messageFromValues, fromValuesArray as messagesFromValuesArray } from './message';
import Message, {
fromValues as messageFromValues,
fromValuesArray as messagesFromValuesArray,
MessageEncoding,
} from './message';
import PresenceMessage, {
fromValues as presenceMessageFromValues,
fromValuesArray as presenceMessagesFromValuesArray,
} from './presencemessage';
import type * as LiveObjectsPlugin from 'plugins/liveobjects';
import Platform from '../../platform';

export const actions = {
HEARTBEAT: 0,
Expand Down Expand Up @@ -128,7 +131,7 @@ export function fromDeserialized(
state = deserialized.state as LiveObjectsPlugin.StateMessage[];
if (state) {
for (let i = 0; i < state.length; i++) {
state[i] = liveObjectsPlugin.StateMessage.fromValues(state[i], Platform);
state[i] = liveObjectsPlugin.StateMessage.fromValues(state[i], Utils, MessageEncoding);
}
}
}
Expand Down Expand Up @@ -177,7 +180,8 @@ export function stringify(
if (msg.presence && presenceMessagePlugin)
result += '; presence=' + toStringArray(presenceMessagePlugin.presenceMessagesFromValuesArray(msg.presence));
if (msg.state && liveObjectsPlugin) {
result += '; state=' + toStringArray(liveObjectsPlugin.StateMessage.fromValuesArray(msg.state, Platform));
result +=
'; state=' + toStringArray(liveObjectsPlugin.StateMessage.fromValuesArray(msg.state, Utils, MessageEncoding));
}
if (msg.error) result += '; error=' + ErrorInfo.fromValues(msg.error).toString();
if (msg.auth && msg.auth.accessToken) result += '; token=' + msg.auth.accessToken;
Expand Down
Loading
Loading