From 6151a9fecd9e975efe666d2d492fe5c2594654ab Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 4 Oct 2024 06:04:55 +0100 Subject: [PATCH 01/16] Fix usePresence won't leave presence if unmount triggered during channel attaching This fixes a race condition issue, where a React app might trigger a component unmount while channel is still in the `attaching` state, thus `presence.leave` was never called for a channel and member would remain there until app reload. --- src/platform/react-hooks/src/hooks/usePresence.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/platform/react-hooks/src/hooks/usePresence.ts b/src/platform/react-hooks/src/hooks/usePresence.ts index 42c7a6b6f..a322c4785 100644 --- a/src/platform/react-hooks/src/hooks/usePresence.ts +++ b/src/platform/react-hooks/src/hooks/usePresence.ts @@ -73,9 +73,11 @@ export function usePresence( return () => { // here we use the ably.connection.state property, which upon this cleanup function call // will have the current connection state for that connection, thanks to us accessing the Ably instance here by reference. - // if the connection is in one of the inactive states or the channel is not attached, a presence.leave call will produce an exception. - // so we only leave presence in other cases. - if (channel.state === 'attached' && !INACTIVE_CONNECTION_STATES.includes(ably.connection.state)) { + // if the connection is in one of the inactive states or the channel is not attached/attaching, a presence.leave call will produce an exception. + // so we should only leave presence in other cases. + const canLeaveFromConnectionState = !INACTIVE_CONNECTION_STATES.includes(ably.connection.state); + const canLeaveFromChannelState = ['attached', 'attaching'].includes(channel.state); + if (canLeaveFromChannelState && canLeaveFromConnectionState) { channel.presence.leave(); } }; From e522ca714e8feb3235566c27b93f1d0f10b8bfb0 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 4 Oct 2024 15:41:48 +0100 Subject: [PATCH 02/16] chore: update changelog for 2.4.1 release --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 39e1f7b36..21f3eb89c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ This contains only the most important and/or user-facing changes; for a full changelog, see the commit history. +## [2.4.1](https://github.com/ably/ably-js/tree/2.4.1) (2024-10-04) + +- Fix `usePresence` hook wasn't leaving presence if component unmounted during channel attaching state [\#1884](https://github.com/ably/ably-js/pull/1884) + ## [2.4.0](https://github.com/ably/ably-js/tree/2.4.0) (2024-09-11) - Add `wsConnectivityCheckUrl` client option [\#1862](https://github.com/ably/ably-js/pull/1862) From b3af19a76ffcc019c5d1afe0281bf7981e6d2166 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 4 Oct 2024 15:42:22 +0100 Subject: [PATCH 03/16] chore: bump version for 2.4.1 release --- package-lock.json | 4 ++-- package.json | 2 +- src/platform/react-hooks/src/AblyReactHooks.ts | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/package-lock.json b/package-lock.json index 63325af50..c5f3dea2d 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "ably", - "version": "2.4.0", + "version": "2.4.1", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "ably", - "version": "2.4.0", + "version": "2.4.1", "license": "Apache-2.0", "dependencies": { "@ably/msgpack-js": "^0.4.0", diff --git a/package.json b/package.json index 5bb15dd54..0a8271645 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "ably", "description": "Realtime client library for Ably, the realtime messaging service", - "version": "2.4.0", + "version": "2.4.1", "license": "Apache-2.0", "bugs": { "url": "https://github.com/ably/ably-js/issues", diff --git a/src/platform/react-hooks/src/AblyReactHooks.ts b/src/platform/react-hooks/src/AblyReactHooks.ts index 9874cd8f7..aa421f28c 100644 --- a/src/platform/react-hooks/src/AblyReactHooks.ts +++ b/src/platform/react-hooks/src/AblyReactHooks.ts @@ -12,7 +12,7 @@ export type ChannelNameAndOptions = { export type ChannelNameAndAblyId = Pick; export type ChannelParameters = string | ChannelNameAndOptions; -export const version = '2.4.0'; +export const version = '2.4.1'; export function channelOptionsWithAgent(options?: Ably.ChannelOptions) { return { From 6c59ffc00fe936ed7f8803f6b6532520f62f0ccf Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Wed, 23 Oct 2024 13:09:39 +0100 Subject: [PATCH 04/16] Add README section to explain "Connection limit exceeded" error and solutions Resolves ECO-4841 --- README.md | 20 ++++++++++++++++++++ docs/react.md | 1 + 2 files changed, 21 insertions(+) diff --git a/README.md b/README.md index 0f7c27796..92c9f6113 100644 --- a/README.md +++ b/README.md @@ -641,6 +641,26 @@ The issue is coming from the fact that when using App Router specifically depend Using `serverComponentsExternalPackages` opt-outs from using Next.js bundling for specific packages and uses native Node.js `require` instead. This is a common problem in App Router for a number of packages (for example, see next.js issue [vercel/next.js#52876](https://github.com/vercel/next.js/issues/52876)), and using `serverComponentsExternalPackages` is the recommended approach here. +#### "Connection limit exceeded" error during development + +If you're encountering a "Connection limit exceeded" error when trying to connect to Ably servers during the development of your application, and you notice spikes or linear increases in the connection count on the Ably dashboard for your app, this may be due to one of the following reasons: + +- If you're using Next.js, your `Ably.Realtime` client instance may be created multiple times on the server side (i.e., in a Node.js process) as you're developing your app, due to Next.js server side rendering your components. Note that even for "Client Components" (i.e., components with the 'use client' directive), [Next.js may still run the component code on the server in order to pre-render HTML](https://nextjs.org/docs/app/building-your-application/rendering/client-components#how-are-client-components-rendered). Depending on your client configuration options, those clients may also successfully open a connection to Ably servers from that Node.js process, which won't close until you restart your development server. + + The simplest fix is to use the `autoConnect` client option and check if the client is created on the server side with a simple window object check, like this: + + ```typescript + const client = new Ably.Realtime({ key: 'your-ably-api-key', autoConnect: typeof window !== 'undefined' }); + ``` + + This will prevent the client from connecting to Ably servers if it is created on the server side, while not affecting your client side components. + +- If you're using any React-based framework, you may be recreating the `Ably.Realtime` client instance on every component re-render. To avoid this, and to prevent potentially reaching the maximum connections limit on your account, move the client instantiation (`new Ably.Realtime`) outside of your components. You can find an example in our [React docs](./docs/react.md#Usage). + +- The connection limit error can be caused by the Hot Reloading mechanism of your development environment (called Fast Refresh in newer Next.js versions, or more generally, Hot Module Replacement - HMR). When you edit and save a file that contains a `new Ably.Realtime()` call in an environment that supports HMR (such as React, Vite, or Next.js apps), the file gets refreshed and creates a new `Ably.Realtime` client instance. However, the previous client remains in memory, unaware of the replacement, and stays connected to Ably's realtime systems. As a result, your connection count will keep increasing with each file edit as new clients are created. This only resets when you manually refresh the browser page, which closes all clients. This behavior applies to any development environment with an HMR mechanism implemented. + + The solution is simple: move the `new Ably.Realtime()` call to a separate file, such as `ably-client.js`, and export the client instance from there. This way, the client instance will only be recreated when you specifically make changes to the `ably-client.js` file, which should be far less frequent than changes in the rest of the codebase. + ## Contributing For guidance on how to contribute to this project, see the [CONTRIBUTING.md](CONTRIBUTING.md). diff --git a/docs/react.md b/docs/react.md index bd0d32afa..1b2630e00 100644 --- a/docs/react.md +++ b/docs/react.md @@ -36,6 +36,7 @@ The hooks are compatible with all versions of React above 16.8.0 Start by connecting your app to Ably using the `AblyProvider` component. See the [`ClientOptions` documentation](https://ably.com/docs/api/realtime-sdk/types?lang=javascript) for information about what options are available when creating an Ably client. If you want to use the `usePresence` or `usePresenceListener` hooks, you'll need to explicitly provide a `clientId`. The `AblyProvider` should be high in your component tree, wrapping every component which needs to access Ably. +Also, ensure that the `Ably.Realtime` instance is created outside of components to prevent it from being recreated on component re-renders. This will help avoid opening extra unnecessary connections to the Ably servers and potentially reaching the maximum connections limit on your account. ```jsx import { AblyProvider } from 'ably/react'; From e28bfcc2031f793bd19188a8fce01527092f822a Mon Sep 17 00:00:00 2001 From: Steven Lindsay Date: Thu, 17 Oct 2024 11:58:23 +0100 Subject: [PATCH 05/16] materialisation: Add new message attributes and actions handling - Added new message attributes, including `action`, `serial`, `refSerial`, `refType`, `updatedAt`, `deletedAt`, and `operation`. Additionally, create functions to map message actions between string and number representations. This update also changes the `fromValues` function to handle action transformations. --- ably.d.ts | 94 ++++++++++++++++++++++++- package.json | 1 + scripts/moduleReport.ts | 2 +- src/common/lib/client/restchannel.ts | 4 +- src/common/lib/types/defaultmessage.ts | 11 +-- src/common/lib/types/message.ts | 62 ++++++++++++++-- src/common/lib/types/protocolmessage.ts | 18 +++-- test/realtime/crypto.test.js | 2 + test/realtime/message.test.js | 51 ++++++++++++++ test/support/runPlaywrightTests.js | 10 ++- 10 files changed, 236 insertions(+), 19 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index b8e85c6a4..b4bf04ef5 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -2335,12 +2335,104 @@ export interface Message { * Timestamp of when the message was received by Ably, as milliseconds since the Unix epoch. */ timestamp?: number; + /** + * The action type of the message, one of the {@link MessageAction} enum values. + */ + action?: MessageAction; + /** + * This message's unique serial. + */ + serial?: string; + /** + * The serial of the message that this message is a reference to. + */ + refSerial?: string; + /** + * The type of reference this message is, in relation to the message it references. + */ + refType?: string; + /** + * If an `update` operation was applied to this message, this will be the timestamp the update occurred. + */ + updatedAt?: number; + /** + * If a `deletion` operation was applied to this message, this will be the timestamp the deletion occurred. + */ + deletedAt?: number; + /** + * If this message resulted from an operation, this will contain the operation details. + */ + operation?: Operation; } +/** + * Contains the details of an operation, such as update or deletion, supplied by the actioning client. + */ +export interface Operation { + /** + * The client ID of the client that initiated the operation. + */ + clientId?: string; + /** + * The description provided by the client that initiated the operation. + */ + description?: string; + /** + * A JSON object of string key-value pairs that may contain metadata associated with the operation. + */ + metadata?: Record; +} + +/** + * The namespace containing the different types of message actions. + */ +declare namespace MessageActions { + /** + * Message action has not been set. + */ + type MESSAGE_UNSET = 'message.unset'; + /** + * Message action for a newly created message. + */ + type MESSAGE_CREATE = 'message.create'; + /** + * Message action for an updated message. + */ + type MESSAGE_UPDATE = 'message.update'; + /** + * Message action for a deleted message. + */ + type MESSAGE_DELETE = 'message.delete'; + /** + * Message action for a newly created annotation. + */ + type ANNOTATION_CREATE = 'annotation.create'; + /** + * Message action for a deleted annotation. + */ + type ANNOTATION_DELETE = 'annotation.delete'; + /** + * Message action for a meta-message that contains channel occupancy information. + */ + type META_OCCUPANCY = 'meta.occupancy'; +} + +/** + * Describes the possible action types used on an {@link Message}. + */ +export type MessageAction = + | MessageActions.MESSAGE_UNSET + | MessageActions.MESSAGE_CREATE + | MessageActions.MESSAGE_UPDATE + | MessageActions.MESSAGE_DELETE + | MessageActions.ANNOTATION_CREATE + | MessageActions.ANNOTATION_DELETE + | MessageActions.META_OCCUPANCY; + /** * A message received from Ably. */ -export type InboundMessage = Message & Required>; +export type InboundMessage = Message & Required>; /** * Static utilities related to messages. diff --git a/package.json b/package.json index 0a8271645..0a82a90ec 100644 --- a/package.json +++ b/package.json @@ -139,6 +139,7 @@ "grunt": "grunt", "test": "npm run test:node", "test:node": "npm run build:node && npm run build:push && mocha", + "test:grep": "npm run build:node && npm run build:push && mocha --grep", "test:node:skip-build": "mocha", "test:webserver": "grunt test:webserver", "test:playwright": "node test/support/runPlaywrightTests.js", diff --git a/scripts/moduleReport.ts b/scripts/moduleReport.ts index 25daba894..aacec5a00 100644 --- a/scripts/moduleReport.ts +++ b/scripts/moduleReport.ts @@ -6,7 +6,7 @@ import { gzip } from 'zlib'; import Table from 'cli-table'; // The maximum size we allow for a minimal useful Realtime bundle (i.e. one that can subscribe to a channel) -const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 98, gzip: 30 }; +const minimalUsefulRealtimeBundleSizeThresholdsKiB = { raw: 100, gzip: 31 }; const baseClientNames = ['BaseRest', 'BaseRealtime']; diff --git a/src/common/lib/client/restchannel.ts b/src/common/lib/client/restchannel.ts index 3138e8611..e27133c6e 100644 --- a/src/common/lib/client/restchannel.ts +++ b/src/common/lib/client/restchannel.ts @@ -2,12 +2,12 @@ import * as Utils from '../util/utils'; import Logger from '../util/logger'; import RestPresence from './restpresence'; import Message, { - fromValues as messageFromValues, - fromValuesArray as messagesFromValuesArray, encodeArray as encodeMessagesArray, serialize as serializeMessage, getMessagesSize, CipherOptions, + fromValues as messageFromValues, + fromValuesArray as messagesFromValuesArray, } from '../types/message'; import ErrorInfo from '../types/errorinfo'; import { PaginatedResult } from './paginatedresource'; diff --git a/src/common/lib/types/defaultmessage.ts b/src/common/lib/types/defaultmessage.ts index dfc4a02b1..79fffccf5 100644 --- a/src/common/lib/types/defaultmessage.ts +++ b/src/common/lib/types/defaultmessage.ts @@ -1,10 +1,11 @@ import Message, { CipherOptions, - fromEncoded, - fromEncodedArray, - encode, decode, + encode, EncodingDecodingContext, + fromEncoded, + fromEncodedArray, + fromValues, } from './message'; import * as API from '../../../../ably'; import Platform from 'common/platform'; @@ -25,8 +26,8 @@ export class DefaultMessage extends Message { } // Used by tests - static fromValues(values: unknown): Message { - return Object.assign(new Message(), values); + static fromValues(values: Message | Record, options?: { stringifyAction?: boolean }): Message { + return fromValues(values, options); } // Used by tests diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index 7cc8b80ac..f0d9c5394 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -9,6 +9,30 @@ import * as API from '../../../../ably'; import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; import { MsgPack } from 'common/types/msgpack'; +const MessageActionArray: API.MessageAction[] = [ + 'message.unset', + 'message.create', + 'message.update', + 'message.delete', + 'annotation.create', + 'annotation.delete', + 'meta.occupancy', +]; + +const MessageActionMap = new Map(MessageActionArray.map((action, index) => [action, index])); + +const ReverseMessageActionMap = new Map( + MessageActionArray.map((action, index) => [index, action]), +); + +function toMessageActionString(actionNumber: number): API.MessageAction | undefined { + return ReverseMessageActionMap.get(actionNumber); +} + +function toMessageActionNumber(messageAction?: API.MessageAction): number | undefined { + return messageAction ? MessageActionMap.get(messageAction) : undefined; +} + export type CipherOptions = { channelCipher: { encrypt: Function; @@ -82,7 +106,7 @@ export async function fromEncoded( encoded: unknown, inputOptions?: API.ChannelOptions, ): Promise { - const msg = fromValues(encoded); + const msg = fromValues(encoded as Message | Record, { stringifyAction: true }); const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null); /* if decoding fails at any point, catch and return the message decoded to * the fullest extent possible */ @@ -260,7 +284,7 @@ export async function fromResponseBody( } for (let i = 0; i < body.length; i++) { - const msg = (body[i] = fromValues(body[i])); + const msg = (body[i] = fromValues(body[i], { stringifyAction: true })); try { await decode(msg, options); } catch (e) { @@ -270,14 +294,22 @@ export async function fromResponseBody( return body; } -export function fromValues(values: unknown): Message { +export function fromValues( + values: Message | Record, + options?: { stringifyAction?: boolean }, +): Message { + const stringifyAction = options?.stringifyAction; + if (stringifyAction) { + const action = toMessageActionString(values.action as number) || values.action; + return Object.assign(new Message(), { ...values, action }); + } return Object.assign(new Message(), values); } export function fromValuesArray(values: unknown[]): Message[] { const count = values.length, result = new Array(count); - for (let i = 0; i < count; i++) result[i] = fromValues(values[i]); + for (let i = 0; i < count; i++) result[i] = fromValues(values[i] as Record); return result; } @@ -304,6 +336,13 @@ class Message { encoding?: string | null; extras?: any; size?: number; + action?: API.MessageAction | number; + serial?: string; + refSerial?: string; + refType?: string; + updatedAt?: number; + deletedAt?: number; + operation?: API.Operation; /** * Overload toJSON() to intercept JSON.stringify() @@ -334,6 +373,13 @@ class Message { connectionId: this.connectionId, connectionKey: this.connectionKey, extras: this.extras, + serial: this.serial, + action: toMessageActionNumber(this.action as API.MessageAction) || this.action, + refSerial: this.refSerial, + refType: this.refType, + updatedAt: this.updatedAt, + deletedAt: this.deletedAt, + operation: this.operation, encoding, data, }; @@ -355,6 +401,14 @@ class Message { else result += '; data (json)=' + JSON.stringify(this.data); } if (this.extras) result += '; extras=' + JSON.stringify(this.extras); + + if (this.action) result += '; action=' + this.action; + if (this.serial) result += '; serial=' + this.serial; + if (this.refSerial) result += '; refSerial=' + this.refSerial; + if (this.refType) result += '; refType=' + this.refType; + if (this.updatedAt) result += '; updatedAt=' + this.updatedAt; + if (this.deletedAt) result += '; deletedAt=' + this.deletedAt; + if (this.operation) result += '; operation=' + JSON.stringify(this.operation); result += ']'; return result; } diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index eaa622a8d..ccb15841f 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -81,16 +81,26 @@ export function fromDeserialized( presenceMessagePlugin: PresenceMessagePlugin | null, ): ProtocolMessage { const error = deserialized.error; - if (error) deserialized.error = ErrorInfo.fromValues(error as ErrorInfo); + if (error) { + deserialized.error = ErrorInfo.fromValues(error as ErrorInfo); + } + const messages = deserialized.messages as Message[]; - if (messages) for (let i = 0; i < messages.length; i++) messages[i] = messageFromValues(messages[i]); + if (messages) { + for (let i = 0; i < messages.length; i++) { + messages[i] = messageFromValues(messages[i], { stringifyAction: true }); + } + } const presence = presenceMessagePlugin ? (deserialized.presence as PresenceMessage[]) : undefined; if (presenceMessagePlugin) { - if (presence && presenceMessagePlugin) - for (let i = 0; i < presence.length; i++) + if (presence && presenceMessagePlugin) { + for (let i = 0; i < presence.length; i++) { presence[i] = presenceMessagePlugin.presenceMessageFromValues(presence[i], true); + } + } } + return Object.assign(new ProtocolMessage(), { ...deserialized, presence }); } diff --git a/test/realtime/crypto.test.js b/test/realtime/crypto.test.js index 14cb65330..18df2f11e 100644 --- a/test/realtime/crypto.test.js +++ b/test/realtime/crypto.test.js @@ -395,6 +395,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('call.msgpack.decode'); var messageFromMsgpack = Message.fromValues( msgpack.decode(BufferUtils.base64Decode(msgpackEncodedMessage)), + { stringifyAction: true }, ); try { @@ -439,6 +440,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('call.msgpack.decode'); var messageFromMsgpack = Message.fromValues( msgpack.decode(BufferUtils.base64Decode(msgpackEncodedMessage)), + { stringifyAction: true }, ); try { diff --git a/test/realtime/message.test.js b/test/realtime/message.test.js index 6d0fe8f7c..6cb2cfd0a 100644 --- a/test/realtime/message.test.js +++ b/test/realtime/message.test.js @@ -4,6 +4,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async var expect = chai.expect; let config = Ably.Realtime.Platform.Config; var createPM = Ably.protocolMessageFromDeserialized; + var Message = Ably.Realtime.Message; var publishIntervalHelper = function (currentMessageNum, channel, dataFn, onPublish) { return function () { @@ -1271,6 +1272,56 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async channel.publish('end', null); }); }); + /** + * @spec TM2j + */ + describe('DefaultMessage.fromValues stringify action', function () { + const testCases = [ + { + description: 'should stringify the numeric action', + action: 1, + options: { stringifyAction: true }, + expectedString: '[Message; action=message.create]', + expectedJSON: { action: 1 }, + }, + { + description: 'should not stringify the numeric action', + action: 1, + options: { stringifyAction: false }, + expectedString: '[Message; action=1]', + expectedJSON: { action: 1 }, + }, + { + description: 'should accept an already stringified action', + action: 'message.update', + options: { stringifyAction: true }, + expectedString: '[Message; action=message.update]', + expectedJSON: { action: 2 }, + }, + { + description: 'should handle no action provided', + action: undefined, + options: { stringifyAction: true }, + expectedString: '[Message]', + expectedJSON: { action: undefined }, + }, + { + description: 'should handle unknown action provided', + action: 10, + options: { stringifyAction: true }, + expectedString: '[Message; action=10]', + expectedJSON: { action: 10 }, + }, + ]; + testCases.forEach(({ description, action, options, expectedString, expectedJSON }) => { + it(description, function () { + const values = { action }; + const message = Message.fromValues(values, options); + expect(message.toString()).to.equal(expectedString); + expect(message.toJSON()).to.deep.contains(expectedJSON); + }); + }); + }); /** * @spec RTS5 diff --git a/test/support/runPlaywrightTests.js b/test/support/runPlaywrightTests.js index edbcc1531..5ff2d9106 100644 --- a/test/support/runPlaywrightTests.js +++ b/test/support/runPlaywrightTests.js @@ -68,14 +68,20 @@ const runTests = async (browserType) => { // Use page.evaluate to add these functions as event listeners to the 'testLog' and 'testResult' Custom Events. // These events are fired by the custom mocha reporter in playwrightSetup.js - page.evaluate(() => { + const grep = process.env.MOCHA_GREP; + page.evaluate((grep) => { window.addEventListener('testLog', ({ type, detail }) => { onTestLog({ type, detail }); }); window.addEventListener('testResult', ({ type, detail }) => { onTestResult({ type, detail }); }); - }); + // Set grep pattern in the browser context + // allows easy filtering of tests. + if (grep) { + window.mocha.grep(new RegExp(grep)); + } + }, grep); }); }; From 21e42ea3198e820af729359fd72b2e8eefe5888b Mon Sep 17 00:00:00 2001 From: Steven Lindsay Date: Thu, 31 Oct 2024 15:37:36 +0000 Subject: [PATCH 06/16] Refactor `Message` type to remove deletedAt and add updateSerial Added an updateSerial to allow LWW semantics to be applied to a message resulting from an operation. Removed the deletedAt timestamp, we will rely only on updateSerial to apply ordering of operations. UpdateAt will still be used, but just so clients can render a simple datetime and not have to understand/parse the serial. --- ably.d.ts | 4 ++-- src/common/lib/types/message.ts | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index b4bf04ef5..f9e59b1d9 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -2356,9 +2356,9 @@ export interface Message { */ updatedAt?: number; /** - * If a `deletion` operation was applied to this message, this will be the timestamp the deletion occurred. + * The serial of the operation that updated this message. */ - deletedAt?: number; + updateSerial?: string; /** * If this message resulted from an operation, this will contain the operation details. */ diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index f0d9c5394..0a9f0d419 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -341,7 +341,7 @@ class Message { refSerial?: string; refType?: string; updatedAt?: number; - deletedAt?: number; + updateSerial?: string; operation?: API.Operation; /** @@ -378,7 +378,7 @@ class Message { refSerial: this.refSerial, refType: this.refType, updatedAt: this.updatedAt, - deletedAt: this.deletedAt, + updateSerial: this.updateSerial, operation: this.operation, encoding, data, @@ -407,7 +407,7 @@ class Message { if (this.refSerial) result += '; refSerial=' + this.refSerial; if (this.refType) result += '; refType=' + this.refType; if (this.updatedAt) result += '; updatedAt=' + this.updatedAt; - if (this.deletedAt) result += '; deletedAt=' + this.deletedAt; + if (this.updateSerial) result += '; updateSerial=' + this.updateSerial; if (this.operation) result += '; operation=' + JSON.stringify(this.operation); result += ']'; return result; From 2b0a1abd4a0e0264f06724d62e2ae50e6670a720 Mon Sep 17 00:00:00 2001 From: Steven Lindsay Date: Thu, 31 Oct 2024 12:04:15 +0000 Subject: [PATCH 07/16] Release version 2.5.0 Updated CHANGELOG and package files to reflect the new version 2.5.0. Added support for new `Message` attributes essential for upcoming features (updates/deletes/annotations) in the Ably service. --- CHANGELOG.md | 7 +++++++ package-lock.json | 4 ++-- package.json | 2 +- src/platform/react-hooks/src/AblyReactHooks.ts | 2 +- 4 files changed, 11 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21f3eb89c..6338fc79b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,13 @@ This contains only the most important and/or user-facing changes; for a full changelog, see the commit history. +## [2.5.0](https://github.com/ably/ably-js/tree/2.5.0) (2024-11-06) + +With this release, ably-js will now expose the new `Message` attributes needed to support upcoming features, +such as updates, deletions and annotations in the Ably service. + +- Added support for new `Message` attributes. [\#1888](https://github.com/ably/ably-js/pull/1888) + ## [2.4.1](https://github.com/ably/ably-js/tree/2.4.1) (2024-10-04) - Fix `usePresence` hook wasn't leaving presence if component unmounted during channel attaching state [\#1884](https://github.com/ably/ably-js/pull/1884) diff --git a/package-lock.json b/package-lock.json index c5f3dea2d..b43483eb0 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "ably", - "version": "2.4.1", + "version": "2.5.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "ably", - "version": "2.4.1", + "version": "2.5.0", "license": "Apache-2.0", "dependencies": { "@ably/msgpack-js": "^0.4.0", diff --git a/package.json b/package.json index 0a82a90ec..dd720d2d5 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "ably", "description": "Realtime client library for Ably, the realtime messaging service", - "version": "2.4.1", + "version": "2.5.0", "license": "Apache-2.0", "bugs": { "url": "https://github.com/ably/ably-js/issues", diff --git a/src/platform/react-hooks/src/AblyReactHooks.ts b/src/platform/react-hooks/src/AblyReactHooks.ts index aa421f28c..a4aa8b86f 100644 --- a/src/platform/react-hooks/src/AblyReactHooks.ts +++ b/src/platform/react-hooks/src/AblyReactHooks.ts @@ -12,7 +12,7 @@ export type ChannelNameAndOptions = { export type ChannelNameAndAblyId = Pick; export type ChannelParameters = string | ChannelNameAndOptions; -export const version = '2.4.1'; +export const version = '2.5.0'; export function channelOptionsWithAgent(options?: Ably.ChannelOptions) { return { From 831d20cac3016cedc8b0abe257b20298bb930ae7 Mon Sep 17 00:00:00 2001 From: Simon Woolf Date: Fri, 25 Oct 2024 15:13:10 -0400 Subject: [PATCH 08/16] Presence_auto_reenter test: refactor and modernise --- test/realtime/presence.test.js | 171 +++++++++++++-------------------- 1 file changed, 66 insertions(+), 105 deletions(-) diff --git a/test/realtime/presence.test.js b/test/realtime/presence.test.js index f2de186fd..adede1432 100644 --- a/test/realtime/presence.test.js +++ b/test/realtime/presence.test.js @@ -1627,117 +1627,78 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async * @spec RTP17g * @specpartial RTP17i - tests simple re-entry, no RESUMED flag test */ - it('presence_auto_reenter', function (done) { + it('presence_auto_reenter', async function () { const helper = this.test.helper; - var channelName = 'presence_auto_reenter'; - var realtime = helper.AblyRealtime(); - var channel = realtime.channels.get(channelName); + const channelName = 'presence_auto_reenter'; + const realtime = helper.AblyRealtime(); + const channel = realtime.channels.get(channelName); + + await realtime.connection.once('connected'); + await channel.attach(); + // presence.get will wait for a sync if needed + await channel.presence.get() + + const pOnPresence = channel.presence.subscriptions.once('enter'); + await channel.presence.enterClient('one', 'onedata'); + await pOnPresence; + + /* inject an additional member into the myMember set, then force a suspended state */ + helper.recordPrivateApi('read.connectionManager.connectionId'); + const connId = realtime.connection.connectionManager.connectionId; + + helper.recordPrivateApi('call.presence._myMembers.put'); + channel.presence._myMembers.put({ + action: 'enter', + clientId: 'two', + connectionId: connId, + id: connId + ':0:0', + data: 'twodata', + }); - async.series( - [ - function (cb) { - realtime.connection.once('connected', function () { - cb(); - }); - }, - function (cb) { - Helper.whenPromiseSettles(channel.attach(), cb); - }, - function (cb) { - if (!channel.presence.syncComplete) { - helper.recordPrivateApi('call.presence.waitSync'); - channel.presence.members.waitSync(cb); - } else { - cb(); - } - }, - function (cb) { - channel.presence.enterClient('one', 'onedata'); - channel.presence.subscribe('enter', function () { - channel.presence.unsubscribe('enter'); - cb(); - }); - }, - function (cb) { - /* inject an additional member into the myMember set, then force a suspended state */ - helper.recordPrivateApi('read.connectionManager.connectionId'); - var connId = realtime.connection.connectionManager.connectionId; - helper.recordPrivateApi('call.presence._myMembers.put'); - channel.presence._myMembers.put({ - action: 'enter', - clientId: 'two', - connectionId: connId, - id: connId + ':0:0', - data: 'twodata', - }); - helper.becomeSuspended(realtime, cb); - }, - function (cb) { + await helper.becomeSuspended(realtime); + + expect(channel.state).to.equal('suspended', 'sanity-check channel state'); + + /* Reconnect */ + const pOnceAttached = channel.once('attached'); + realtime.connection.connect(); + await pOnceAttached; + + /* Since we haven't been gone for two minutes, we don't know for sure + * that realtime will feel it necessary to do a sync - if it doesn't, + * we request one */ + if (channel.presence.syncComplete) { + helper.recordPrivateApi('call.channel.sync'); + channel.sync(); + } + await channel.presence.get(); + + /* Now just wait for an enter! */ + const enteredMembers = new Set(); + await new Promise((resolve, reject) => { + channel.presence.subscribe('enter', (presmsg) => { + enteredMembers.add(presmsg.clientId); + if (enteredMembers.size === 2) { try { - expect(channel.state).to.equal('suspended', 'sanity-check channel state'); + expect(enteredMembers.has('one')).to.equal(true, 'Check client one entered'); + expect(enteredMembers.has('two')).to.equal(true, 'Check client two entered'); + channel.presence.unsubscribe('enter'); + resolve(); } catch (err) { - cb(err); + reject(err); return; } - /* Reconnect */ - realtime.connection.connect(); - channel.once('attached', function () { - cb(); - }); - }, - function (cb) { - /* Since we haven't been gone for two minutes, we don't know for sure - * that realtime will feel it necessary to do a sync - if it doesn't, - * we request one */ - if (channel.presence.syncComplete) { - helper.recordPrivateApi('call.channel.sync'); - channel.sync(); - } - helper.recordPrivateApi('call.presence.waitSync'); - channel.presence.members.waitSync(cb); - }, - function (cb) { - /* Now just wait for an enter! */ - let enteredMembers = new Set(); - channel.presence.subscribe('enter', function (presmsg) { - enteredMembers.add(presmsg.clientId); - if (enteredMembers.size === 2) { - try { - expect(enteredMembers.has('one')).to.equal(true, 'Check client one entered'); - expect(enteredMembers.has('two')).to.equal(true, 'Check client two entered'); - channel.presence.unsubscribe('enter'); - cb(); - } catch (err) { - cb(err); - return; - } - } - }); - }, - function (cb) { - Helper.whenPromiseSettles(channel.presence.get(), function (err, results) { - if (err) { - cb(err); - return; - } - try { - expect(channel.presence.syncComplete, 'Check in sync').to.be.ok; - expect(results.length).to.equal(3, 'Check correct number of results'); - expect(extractClientIds(results)).deep.to.equal(['one', 'one', 'two'], 'check correct members'); - expect(extractMember(results, 'one').data).to.equal('onedata', 'check correct data on one'); - expect(extractMember(results, 'two').data).to.equal('twodata', 'check correct data on two'); - } catch (err) { - cb(err); - return; - } - cb(); - }); - }, - ], - function (err) { - helper.closeAndFinish(done, realtime, err); - }, - ); + } + }); + }); + + const results = await channel.presence.get(); + expect(channel.presence.syncComplete, 'Check in sync').to.be.ok; + expect(results.length).to.equal(3, 'Check correct number of results'); + expect(extractClientIds(results)).deep.to.equal(['one', 'one', 'two'], 'check correct members'); + expect(extractMember(results, 'one').data).to.equal('onedata', 'check correct data on one'); + expect(extractMember(results, 'two').data).to.equal('twodata', 'check correct data on two'); + realtime.close(); }); /** From a04ae5a2748fb7e28da91a7cba61c850eaa884b3 Mon Sep 17 00:00:00 2001 From: Simon Woolf Date: Fri, 25 Oct 2024 15:38:42 -0400 Subject: [PATCH 09/16] Support RTP17g1: presence auto re-enter with a different connId should not have its id set --- src/common/lib/client/realtimepresence.ts | 22 ++++++------ test/common/modules/shared_helper.js | 7 ++-- test/realtime/presence.test.js | 44 ++++++++++++++++++++++- 3 files changed, 59 insertions(+), 14 deletions(-) diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index ef02e8e78..321ab245c 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -425,16 +425,8 @@ class RealtimePresence extends EventEmitter { } _ensureMyMembersPresent(): void { - const myMembers = this._myMembers, - reenterCb = (err?: ErrorInfo | null) => { - if (err) { - const msg = 'Presence auto-re-enter failed: ' + err.toString(); - const wrappedErr = new ErrorInfo(msg, 91004, 400); - Logger.logAction(this.logger, Logger.LOG_ERROR, 'RealtimePresence._ensureMyMembersPresent()', msg); - const change = new ChannelStateChange(this.channel.state, this.channel.state, true, false, wrappedErr); - this.channel.emit('update', change); - } - }; + const myMembers = this._myMembers; + const connId = this.channel.connectionManager.connectionId; for (const memberKey in myMembers.map) { const entry = myMembers.map[memberKey]; @@ -446,7 +438,15 @@ class RealtimePresence extends EventEmitter { ); // RTP17g: Send ENTER containing the member id, clientId and data // attributes. - Utils.whenPromiseSettles(this._enterOrUpdateClient(entry.id, entry.clientId, entry.data, 'enter'), reenterCb); + // RTP17g1: suppress id if the connId has changed + const id = (entry.connectionId === connId) ? entry.id : undefined; + this._enterOrUpdateClient(id, entry.clientId, entry.data, 'enter').catch((err) => { + const msg = 'Presence auto-re-enter failed: ' + err.toString(); + const wrappedErr = new ErrorInfo(msg, 91004, 400); + Logger.logAction(this.logger, Logger.LOG_ERROR, 'RealtimePresence._ensureMyMembersPresent()', msg); + const change = new ChannelStateChange(this.channel.state, this.channel.state, true, false, wrappedErr); + this.channel.emit('update', change); + }); } } diff --git a/test/common/modules/shared_helper.js b/test/common/modules/shared_helper.js index 4ce973bb4..46c4732b0 100644 --- a/test/common/modules/shared_helper.js +++ b/test/common/modules/shared_helper.js @@ -257,7 +257,7 @@ define([ becomeSuspended(realtime, cb) { const helper = this.addingHelperFunction('becomeSuspended'); - helper._becomeSuspended(realtime, cb); + return helper._becomeSuspended(realtime, cb); } _becomeSuspended(realtime, cb) { @@ -268,10 +268,13 @@ define([ self.recordPrivateApi('call.connectionManager.notifyState'); realtime.connection.connectionManager.notifyState({ state: 'suspended' }); }); - if (cb) + if (cb) { realtime.connection.once('suspended', function () { cb(); }); + } else { + return realtime.connection.once('suspended'); + } } callbackOnClose(realtime, callback) { diff --git a/test/realtime/presence.test.js b/test/realtime/presence.test.js index adede1432..f4ee91c12 100644 --- a/test/realtime/presence.test.js +++ b/test/realtime/presence.test.js @@ -1,7 +1,7 @@ 'use strict'; define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { - var expect = chai.expect; + const { expect, assert } = chai; var createPM = Ably.protocolMessageFromDeserialized; var PresenceMessage = Ably.Realtime.PresenceMessage; @@ -1701,6 +1701,48 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async realtime.close(); }); + /** + * Test the auto-re-enter functionality with a resume failure resulting in a different + * connectionId (the re-entry should not have a message id) + * + * @spec RTP17g + * @spec RTP17g1 + */ + it('presence_auto_reenter_different_connid', async function () { + const helper = this.test.helper; + const channelName = 'presence_auto_reenter_different_connid'; + const realtime = helper.AblyRealtime({transportParams: {remainPresentFor: 5000}}); + const channel = realtime.channels.get(channelName); + + await realtime.connection.once('connected'); + const firstConnId = realtime.connection.id; + await channel.attach(); + // presence.get will wait for a sync if needed + await channel.presence.get() + + const pOnPresence = channel.presence.subscriptions.once('enter'); + await channel.presence.enterClient('one', 'onedata'); + const member1 = await pOnPresence; + + await helper.becomeSuspended(realtime); + assert.equal(channel.state, 'suspended', 'sanity-check channel state'); + + /* Reconnect. Since we were suspended, we will get a different connection id */ + const pOnceAttached = channel.once('attached'); + const pOnEnter = channel.presence.subscriptions.once('enter'); + const pOnLeave = channel.presence.subscriptions.once('leave'); + realtime.connection.connect(); + await pOnceAttached; + const secondConnId = realtime.connection.id; + assert.notEqual(firstConnId, secondConnId, 'sanity-check connection id changed post-suspend'); + const [enter, leave] = await Promise.all([pOnEnter, pOnLeave]); + assert.equal(leave.connectionId, firstConnId, 'Check the leave for the old connid'); + assert.equal(enter.connectionId, secondConnId, 'Check enter for new connid'); + assert.notEqual(enter.id, member1.id, 'Check the new enter did not have the msgId of the original'); + + realtime.close(); + }); + /** * Test failed presence auto-re-entering * From d9a6f4b9a6597956b4fb9673bc4be1d6cf52189f Mon Sep 17 00:00:00 2001 From: Simon Woolf Date: Thu, 7 Nov 2024 19:09:23 +0000 Subject: [PATCH 10/16] presence auto-re-enter: use ErrorInfo.cause Co-authored-by: Owen Pearson <48608556+owenpearson@users.noreply.github.com> --- src/common/lib/client/realtimepresence.ts | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index 321ab245c..3a1fc62ed 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -441,9 +441,8 @@ class RealtimePresence extends EventEmitter { // RTP17g1: suppress id if the connId has changed const id = (entry.connectionId === connId) ? entry.id : undefined; this._enterOrUpdateClient(id, entry.clientId, entry.data, 'enter').catch((err) => { - const msg = 'Presence auto-re-enter failed: ' + err.toString(); - const wrappedErr = new ErrorInfo(msg, 91004, 400); - Logger.logAction(this.logger, Logger.LOG_ERROR, 'RealtimePresence._ensureMyMembersPresent()', msg); + const wrappedErr = new ErrorInfo('Presence auto re-enter failed', 91004, 400, err); + Logger.logAction(this.logger, Logger.LOG_ERROR, 'RealtimePresence._ensureMyMembersPresent()', 'Presence auto re-enter failed; reason = ' + Utils.inspectError(err)); const change = new ChannelStateChange(this.channel.state, this.channel.state, true, false, wrappedErr); this.channel.emit('update', change); }); From 2d5323d0347ee4c1243792f3b69543ca7d43205a Mon Sep 17 00:00:00 2001 From: Simon Woolf Date: Thu, 7 Nov 2024 14:13:43 -0500 Subject: [PATCH 11/16] Linter fixes --- ably.d.ts | 1 - src/common/lib/client/realtimepresence.ts | 9 +++++++-- test/realtime/presence.test.js | 8 ++++---- 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/ably.d.ts b/ably.d.ts index f9e59b1d9..cbe1df3dd 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -2284,7 +2284,6 @@ export declare interface Channels { * This experimental method allows you to create custom realtime data feeds by selectively subscribing * to receive only part of the data from the channel. * See the [announcement post](https://pages.ably.com/subscription-filters-preview) for more information. - * * @param name - The channel name. * @param deriveOptions - A {@link DeriveOptions} object. * @param channelOptions - A {@link ChannelOptions} object. diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index 3a1fc62ed..dc4520da6 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -439,10 +439,15 @@ class RealtimePresence extends EventEmitter { // RTP17g: Send ENTER containing the member id, clientId and data // attributes. // RTP17g1: suppress id if the connId has changed - const id = (entry.connectionId === connId) ? entry.id : undefined; + const id = entry.connectionId === connId ? entry.id : undefined; this._enterOrUpdateClient(id, entry.clientId, entry.data, 'enter').catch((err) => { const wrappedErr = new ErrorInfo('Presence auto re-enter failed', 91004, 400, err); - Logger.logAction(this.logger, Logger.LOG_ERROR, 'RealtimePresence._ensureMyMembersPresent()', 'Presence auto re-enter failed; reason = ' + Utils.inspectError(err)); + Logger.logAction( + this.logger, + Logger.LOG_ERROR, + 'RealtimePresence._ensureMyMembersPresent()', + 'Presence auto re-enter failed; reason = ' + Utils.inspectError(err), + ); const change = new ChannelStateChange(this.channel.state, this.channel.state, true, false, wrappedErr); this.channel.emit('update', change); }); diff --git a/test/realtime/presence.test.js b/test/realtime/presence.test.js index f4ee91c12..2a7672e2b 100644 --- a/test/realtime/presence.test.js +++ b/test/realtime/presence.test.js @@ -1,7 +1,7 @@ 'use strict'; define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { - const { expect, assert } = chai; + const { expect, assert } = chai; var createPM = Ably.protocolMessageFromDeserialized; var PresenceMessage = Ably.Realtime.PresenceMessage; @@ -1636,7 +1636,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async await realtime.connection.once('connected'); await channel.attach(); // presence.get will wait for a sync if needed - await channel.presence.get() + await channel.presence.get(); const pOnPresence = channel.presence.subscriptions.once('enter'); await channel.presence.enterClient('one', 'onedata'); @@ -1711,14 +1711,14 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async it('presence_auto_reenter_different_connid', async function () { const helper = this.test.helper; const channelName = 'presence_auto_reenter_different_connid'; - const realtime = helper.AblyRealtime({transportParams: {remainPresentFor: 5000}}); + const realtime = helper.AblyRealtime({ transportParams: { remainPresentFor: 5000 } }); const channel = realtime.channels.get(channelName); await realtime.connection.once('connected'); const firstConnId = realtime.connection.id; await channel.attach(); // presence.get will wait for a sync if needed - await channel.presence.get() + await channel.presence.get(); const pOnPresence = channel.presence.subscriptions.once('enter'); await channel.presence.enterClient('one', 'onedata'); From 8248411847b36f9418d3f3abd86b96f385d31f76 Mon Sep 17 00:00:00 2001 From: Andy Ford Date: Wed, 27 Nov 2024 12:19:57 +0000 Subject: [PATCH 12/16] build: dont check submodules to build module The only git submodule we need is ably-common, and this is only used as part of testing to create test apps. We don't use it when building (despite it being a build check). This check during build prevents other SDKs from depending on a branch of ably-js, as it doesn't have the submodules, and can't init them. This change therefore removes the submodules check when building the library, allowing SDKs to depend on branch versions, if they want to. --- Gruntfile.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Gruntfile.js b/Gruntfile.js index 3bd1b0c23..bafac9f51 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -73,7 +73,7 @@ module.exports = function (grunt) { }); }); - grunt.registerTask('build', ['checkGitSubmodules', 'webpack:all', 'build:browser', 'build:node', 'build:push']); + grunt.registerTask('build', ['webpack:all', 'build:browser', 'build:node', 'build:push']); grunt.registerTask('all', ['build', 'requirejs']); From 282b188993500a50ab37cc15b78ab9df7eaa6137 Mon Sep 17 00:00:00 2001 From: Andrew Bulat Date: Fri, 25 Oct 2024 11:08:35 +0100 Subject: [PATCH 13/16] Extract common code for message processing in RealtimeChannel to a separate function Also fixes the issue with processing Presence messages, where a decoding error would prevent us from setting `connectionId`, `timestamp` and `id` fields for a message. Now this behavior is aligned with how it worked with regular messages. Resolves #1907 --- src/common/lib/client/realtimechannel.ts | 118 ++++++++++++++--------- 1 file changed, 72 insertions(+), 46 deletions(-) diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 24ffe62cf..4929295ec 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -569,34 +569,17 @@ class RealtimeChannel extends EventEmitter { if (!message.presence) break; // eslint-disable-next-line no-fallthrough case actions.PRESENCE: { - const presence = message.presence; + const presenceMessages = message.presence; - if (!presence) { + if (!presenceMessages) { break; } - const { id, connectionId, timestamp } = message; - const options = this.channelOptions; - let presenceMsg: PresenceMessage; - for (let i = 0; i < presence.length; i++) { - try { - presenceMsg = presence[i]; - await decodePresenceMessage(presenceMsg, options); - if (!presenceMsg.connectionId) presenceMsg.connectionId = connectionId; - if (!presenceMsg.timestamp) presenceMsg.timestamp = timestamp; - if (!presenceMsg.id) presenceMsg.id = id + ':' + i; - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - } - } + await this._decodeAndPrepareMessages(message, presenceMessages, (msg) => decodePresenceMessage(msg, options)); + if (this._presence) { - this._presence.setPresence(presence, isSync, syncChannelSerial as any); + this._presence.setPresence(presenceMessages, isSync, syncChannelSerial as any); } break; } @@ -620,10 +603,7 @@ class RealtimeChannel extends EventEmitter { const messages = message.messages as Array, firstMessage = messages[0], - lastMessage = messages[messages.length - 1], - id = message.id, - connectionId = message.connectionId, - timestamp = message.timestamp; + lastMessage = messages[messages.length - 1]; if ( firstMessage.extras && @@ -641,36 +621,37 @@ class RealtimeChannel extends EventEmitter { break; } - for (let i = 0; i < messages.length; i++) { - const msg = messages[i]; - try { - await decodeMessage(msg, this._decodingContext); - } catch (e) { + const { unrecoverableError } = await this._decodeAndPrepareMessages( + message, + messages, + (msg) => decodeMessage(msg, this._decodingContext), + (e) => { /* decrypt failed .. the most likely cause is that we have the wrong key */ - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - switch ((e as ErrorInfo).code) { + const errorInfo = e as ErrorInfo; + + switch (errorInfo.code) { case 40018: /* decode failure */ - this._startDecodeFailureRecovery(e as ErrorInfo); - return; + this._startDecodeFailureRecovery(errorInfo); + return { unrecoverableError: true }; + case 40019: /* No vcdiff plugin passed in - no point recovering, give up */ // eslint-disable-next-line no-fallthrough case 40021: /* Browser does not support deltas, similarly no point recovering */ - this.notifyState('failed', e as ErrorInfo); - return; + this.notifyState('failed', errorInfo); + return { unrecoverableError: true }; + + default: + return { unrecoverableError: false }; } - } - if (!msg.connectionId) msg.connectionId = connectionId; - if (!msg.timestamp) msg.timestamp = timestamp; - if (!msg.id) msg.id = id + ':' + i; + }, + ); + if (unrecoverableError) { + return; } + this._lastPayload.messageId = lastMessage.id; this._lastPayload.protocolMessageChannelSerial = message.channelSerial; this.onEvent(messages); @@ -700,6 +681,51 @@ class RealtimeChannel extends EventEmitter { } } + /** + * Mutates provided messages by adding `connectionId`, `timestamp` and `id` fields, and decoding message data. + * + * @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding + * and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided + */ + private async _decodeAndPrepareMessages( + protocolMessage: ProtocolMessage, + messages: T[], + decodeFn: (msg: T) => Promise, + decodeErrorRecoveryHandler?: (e: Error) => { unrecoverableError: boolean }, + ): Promise<{ unrecoverableError: boolean }> { + const { id, connectionId, timestamp } = protocolMessage; + + for (let i = 0; i < messages.length; i++) { + const msg = messages[i]; + + try { + // decode underlying data for a message + await decodeFn(msg); + } catch (e) { + Logger.logAction( + this.logger, + Logger.LOG_ERROR, + 'RealtimeChannel.decodeAndPrepareMessages()', + (e as Error).toString(), + ); + + if (decodeErrorRecoveryHandler) { + const { unrecoverableError } = decodeErrorRecoveryHandler(e as Error); + if (unrecoverableError) { + // break out of for loop by returning + return { unrecoverableError: true }; + } + } + } + + if (!msg.connectionId) msg.connectionId = connectionId; + if (!msg.timestamp) msg.timestamp = timestamp; + if (!msg.id) msg.id = id + ':' + i; + } + + return { unrecoverableError: false }; + } + _startDecodeFailureRecovery(reason: ErrorInfo): void { if (!this._lastPayload.decodeFailureRecoveryInProgress) { Logger.logAction( From 3b5b21bc442c2996245a6eb472de6e246fd79b58 Mon Sep 17 00:00:00 2001 From: Simon Woolf Date: Wed, 13 Nov 2024 20:16:08 +0000 Subject: [PATCH 14/16] Extract PresenceMap to its own file for unit-testability --- src/common/lib/client/defaultrealtime.ts | 2 + src/common/lib/client/presencemap.ts | 205 ++++++++++++++++++++++ src/common/lib/client/realtimepresence.ts | 196 +-------------------- 3 files changed, 208 insertions(+), 195 deletions(-) create mode 100644 src/common/lib/client/presencemap.ts diff --git a/src/common/lib/client/defaultrealtime.ts b/src/common/lib/client/defaultrealtime.ts index 77e3eca41..77854711a 100644 --- a/src/common/lib/client/defaultrealtime.ts +++ b/src/common/lib/client/defaultrealtime.ts @@ -11,6 +11,7 @@ import RealtimePresence from './realtimepresence'; import { DefaultPresenceMessage } from '../types/defaultpresencemessage'; import WebSocketTransport from '../transport/websockettransport'; import { FilteredSubscriptions } from './filteredsubscriptions'; +import { PresenceMap } from './presencemap'; import { fromValues as presenceMessageFromValues, fromValuesArray as presenceMessagesFromValuesArray, @@ -69,4 +70,5 @@ export class DefaultRealtime extends BaseRealtime { // Used by tests static _Http = Http; + static _PresenceMap = PresenceMap; } diff --git a/src/common/lib/client/presencemap.ts b/src/common/lib/client/presencemap.ts new file mode 100644 index 000000000..f8f4f8dc0 --- /dev/null +++ b/src/common/lib/client/presencemap.ts @@ -0,0 +1,205 @@ +import * as Utils from '../util/utils'; +import EventEmitter from '../util/eventemitter'; +import Logger from '../util/logger'; +import PresenceMessage, { fromValues as presenceMessageFromValues } from '../types/presencemessage'; + +import type RealtimePresence from './realtimepresence'; + +type compFn = (item: PresenceMessage, existing: PresenceMessage) => boolean; + +export interface RealtimePresenceParams { + waitForSync?: boolean; + clientId?: string; + connectionId?: string; +} + +function newerThan(item: PresenceMessage, existing: PresenceMessage): boolean { + /* RTP2b1: if either is synthesised, compare by timestamp */ + if (item.isSynthesized() || existing.isSynthesized()) { + // RTP2b1a: if equal, prefer the newly-arrived one + return (item.timestamp as number) >= (existing.timestamp as number); + } + + /* RTP2b2 */ + const itemOrderings = item.parseId(), + existingOrderings = existing.parseId(); + if (itemOrderings.msgSerial === existingOrderings.msgSerial) { + return itemOrderings.index > existingOrderings.index; + } else { + return itemOrderings.msgSerial > existingOrderings.msgSerial; + } +} + +export class PresenceMap extends EventEmitter { + map: Record; + residualMembers: Record | null; + syncInProgress: boolean; + presence: RealtimePresence; + memberKey: (item: PresenceMessage) => string; + newerThan: compFn; + + constructor(presence: RealtimePresence, memberKey: (item: PresenceMessage) => string, newer: compFn = newerThan) { + super(presence.logger); + this.presence = presence; + this.map = Object.create(null); + this.syncInProgress = false; + this.residualMembers = null; + this.memberKey = memberKey; + this.newerThan = newer; + } + + get(key: string) { + return this.map[key]; + } + + getClient(clientId: string) { + const map = this.map, + result = []; + for (const key in map) { + const item = map[key]; + if (item.clientId == clientId && item.action != 'absent') result.push(item); + } + return result; + } + + list(params: RealtimePresenceParams) { + const map = this.map, + clientId = params && params.clientId, + connectionId = params && params.connectionId, + result = []; + + for (const key in map) { + const item = map[key]; + if (item.action === 'absent') continue; + if (clientId && clientId != item.clientId) continue; + if (connectionId && connectionId != item.connectionId) continue; + result.push(item); + } + return result; + } + + put(item: PresenceMessage) { + if (item.action === 'enter' || item.action === 'update') { + item = presenceMessageFromValues(item); + item.action = 'present'; + } + const map = this.map, + key = this.memberKey(item); + /* we've seen this member, so do not remove it at the end of sync */ + if (this.residualMembers) delete this.residualMembers[key]; + + /* compare the timestamp of the new item with any existing member (or ABSENT witness) */ + const existingItem = map[key]; + if (existingItem && !this.newerThan(item, existingItem)) { + return false; + } + map[key] = item; + return true; + } + + values() { + const map = this.map, + result = []; + for (const key in map) { + const item = map[key]; + if (item.action != 'absent') result.push(item); + } + return result; + } + + remove(item: PresenceMessage) { + const map = this.map, + key = this.memberKey(item); + const existingItem = map[key]; + + if (existingItem && !this.newerThan(item, existingItem)) { + return false; + } + + /* RTP2f */ + if (this.syncInProgress) { + item = presenceMessageFromValues(item); + item.action = 'absent'; + map[key] = item; + } else { + delete map[key]; + } + + return true; + } + + startSync() { + const map = this.map, + syncInProgress = this.syncInProgress; + Logger.logAction( + this.logger, + Logger.LOG_MINOR, + 'PresenceMap.startSync()', + 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, + ); + /* we might be called multiple times while a sync is in progress */ + if (!this.syncInProgress) { + this.residualMembers = Utils.copy(map); + this.setInProgress(true); + } + } + + endSync() { + const map = this.map, + syncInProgress = this.syncInProgress; + Logger.logAction( + this.logger, + Logger.LOG_MINOR, + 'PresenceMap.endSync()', + 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, + ); + if (syncInProgress) { + /* we can now strip out the ABSENT members, as we have + * received all of the out-of-order sync messages */ + for (const memberKey in map) { + const entry = map[memberKey]; + if (entry.action === 'absent') { + delete map[memberKey]; + } + } + /* any members that were present at the start of the sync, + * and have not been seen in sync, can be removed, and leave events emitted */ + this.presence._synthesizeLeaves(Utils.valuesArray(this.residualMembers as Record)); + for (const memberKey in this.residualMembers) { + delete map[memberKey]; + } + this.residualMembers = null; + + /* finish, notifying any waiters */ + this.setInProgress(false); + } + this.emit('sync'); + } + + waitSync(callback: () => void) { + const syncInProgress = this.syncInProgress; + Logger.logAction( + this.logger, + Logger.LOG_MINOR, + 'PresenceMap.waitSync()', + 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, + ); + if (!syncInProgress) { + callback(); + return; + } + this.once('sync', callback); + } + + clear() { + this.map = {}; + this.setInProgress(false); + this.residualMembers = null; + } + + setInProgress(inProgress: boolean) { + Logger.logAction(this.logger, Logger.LOG_MICRO, 'PresenceMap.setInProgress()', 'inProgress = ' + inProgress); + this.syncInProgress = inProgress; + this.presence.syncComplete = !inProgress; + } +} diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index dc4520da6..64ad344f6 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -13,12 +13,7 @@ import ChannelStateChange from './channelstatechange'; import { CipherOptions } from '../types/message'; import { ErrCallback } from '../../types/utils'; import { PaginatedResult } from './paginatedresource'; - -interface RealtimePresenceParams { - waitForSync?: boolean; - clientId?: string; - connectionId?: string; -} +import { PresenceMap, RealtimePresenceParams } from './presencemap'; interface RealtimeHistoryParams { start?: number; @@ -64,23 +59,6 @@ function waitAttached(channel: RealtimeChannel, callback: ErrCallback, action: ( } } -function newerThan(item: PresenceMessage, existing: PresenceMessage) { - /* RTP2b1: if either is synthesised, compare by timestamp */ - if (item.isSynthesized() || existing.isSynthesized()) { - // RTP2b1a: if equal, prefer the newly-arrived one - return (item.timestamp as number) >= (existing.timestamp as number); - } - - /* RTP2b2 */ - const itemOrderings = item.parseId(), - existingOrderings = existing.parseId(); - if (itemOrderings.msgSerial === existingOrderings.msgSerial) { - return itemOrderings.index > existingOrderings.index; - } else { - return itemOrderings.msgSerial > existingOrderings.msgSerial; - } -} - class RealtimePresence extends EventEmitter { channel: RealtimeChannel; pendingPresence: { presence: PresenceMessage; callback: ErrCallback }[]; @@ -491,176 +469,4 @@ class RealtimePresence extends EventEmitter { } } -class PresenceMap extends EventEmitter { - map: Record; - residualMembers: Record | null; - syncInProgress: boolean; - presence: RealtimePresence; - memberKey: (item: PresenceMessage) => string; - - constructor(presence: RealtimePresence, memberKey: (item: PresenceMessage) => string) { - super(presence.logger); - this.presence = presence; - this.map = Object.create(null); - this.syncInProgress = false; - this.residualMembers = null; - this.memberKey = memberKey; - } - - get(key: string) { - return this.map[key]; - } - - getClient(clientId: string) { - const map = this.map, - result = []; - for (const key in map) { - const item = map[key]; - if (item.clientId == clientId && item.action != 'absent') result.push(item); - } - return result; - } - - list(params: RealtimePresenceParams) { - const map = this.map, - clientId = params && params.clientId, - connectionId = params && params.connectionId, - result = []; - - for (const key in map) { - const item = map[key]; - if (item.action === 'absent') continue; - if (clientId && clientId != item.clientId) continue; - if (connectionId && connectionId != item.connectionId) continue; - result.push(item); - } - return result; - } - - put(item: PresenceMessage) { - if (item.action === 'enter' || item.action === 'update') { - item = presenceMessageFromValues(item); - item.action = 'present'; - } - const map = this.map, - key = this.memberKey(item); - /* we've seen this member, so do not remove it at the end of sync */ - if (this.residualMembers) delete this.residualMembers[key]; - - /* compare the timestamp of the new item with any existing member (or ABSENT witness) */ - const existingItem = map[key]; - if (existingItem && !newerThan(item, existingItem)) { - return false; - } - map[key] = item; - return true; - } - - values() { - const map = this.map, - result = []; - for (const key in map) { - const item = map[key]; - if (item.action != 'absent') result.push(item); - } - return result; - } - - remove(item: PresenceMessage) { - const map = this.map, - key = this.memberKey(item); - const existingItem = map[key]; - - if (existingItem && !newerThan(item, existingItem)) { - return false; - } - - /* RTP2f */ - if (this.syncInProgress) { - item = presenceMessageFromValues(item); - item.action = 'absent'; - map[key] = item; - } else { - delete map[key]; - } - - return true; - } - - startSync() { - const map = this.map, - syncInProgress = this.syncInProgress; - Logger.logAction( - this.logger, - Logger.LOG_MINOR, - 'PresenceMap.startSync()', - 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, - ); - /* we might be called multiple times while a sync is in progress */ - if (!this.syncInProgress) { - this.residualMembers = Utils.copy(map); - this.setInProgress(true); - } - } - - endSync() { - const map = this.map, - syncInProgress = this.syncInProgress; - Logger.logAction( - this.logger, - Logger.LOG_MINOR, - 'PresenceMap.endSync()', - 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, - ); - if (syncInProgress) { - /* we can now strip out the ABSENT members, as we have - * received all of the out-of-order sync messages */ - for (const memberKey in map) { - const entry = map[memberKey]; - if (entry.action === 'absent') { - delete map[memberKey]; - } - } - /* any members that were present at the start of the sync, - * and have not been seen in sync, can be removed, and leave events emitted */ - this.presence._synthesizeLeaves(Utils.valuesArray(this.residualMembers as Record)); - for (const memberKey in this.residualMembers) { - delete map[memberKey]; - } - this.residualMembers = null; - - /* finish, notifying any waiters */ - this.setInProgress(false); - } - this.emit('sync'); - } - - waitSync(callback: () => void) { - const syncInProgress = this.syncInProgress; - Logger.logAction( - this.logger, - Logger.LOG_MINOR, - 'PresenceMap.waitSync()', - 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, - ); - if (!syncInProgress) { - callback(); - return; - } - this.once('sync', callback); - } - - clear() { - this.map = {}; - this.setInProgress(false); - this.residualMembers = null; - } - - setInProgress(inProgress: boolean) { - Logger.logAction(this.logger, Logger.LOG_MICRO, 'PresenceMap.setInProgress()', 'inProgress = ' + inProgress); - this.syncInProgress = inProgress; - this.presence.syncComplete = !inProgress; - } -} - export default RealtimePresence; From b18a35a7ca93807df107e2152b65f996d069f993 Mon Sep 17 00:00:00 2001 From: Simon Woolf Date: Wed, 13 Nov 2024 20:16:22 +0000 Subject: [PATCH 15/16] PresenceMap: only emit a leave if a member was present per https://github.com/ably/specification/pull/222/ --- .mocharc.js | 2 +- src/common/lib/client/presencemap.ts | 2 +- test/unit/presencemap.test.js | 50 ++++++++++++++++++++++++++++ 3 files changed, 52 insertions(+), 2 deletions(-) create mode 100644 test/unit/presencemap.test.js diff --git a/.mocharc.js b/.mocharc.js index fb2c98119..a7201346a 100644 --- a/.mocharc.js +++ b/.mocharc.js @@ -9,7 +9,7 @@ const config = { // if you've defined specs in your config. therefore we work around it by only adding specs to the // config if none are passed as arguments if (!process.argv.slice(2).some(isTestFile)) { - config.spec = ['test/realtime/*.test.js', 'test/rest/*.test.js']; + config.spec = ['test/realtime/*.test.js', 'test/rest/*.test.js', 'test/unit/*.test.js']; } function isTestFile(arg) { diff --git a/src/common/lib/client/presencemap.ts b/src/common/lib/client/presencemap.ts index f8f4f8dc0..793fde54b 100644 --- a/src/common/lib/client/presencemap.ts +++ b/src/common/lib/client/presencemap.ts @@ -125,7 +125,7 @@ export class PresenceMap extends EventEmitter { delete map[key]; } - return true; + return !!existingItem; } startSync() { diff --git a/test/unit/presencemap.test.js b/test/unit/presencemap.test.js new file mode 100644 index 000000000..713669dc4 --- /dev/null +++ b/test/unit/presencemap.test.js @@ -0,0 +1,50 @@ +'use strict'; + +define(['chai', 'ably'], function (chai, Ably) { + const { assert } = chai; + const PresenceMap = Ably.Realtime._PresenceMap; + + class MockRealtimePresence {} + + describe('PresenceMap', () => { + let presenceMap; + + // Helper function to create a presence message + const createPresenceMessage = (clientId, connectionId, action, timestamp) => ({ + clientId, + connectionId, + timestamp, + action, + }); + + beforeEach(() => { + // Initialize with a simple memberKey function that uses clientId as the key + presenceMap = new PresenceMap( + new MockRealtimePresence(), + (item) => item.clientId + ':' + item.connectionId, + (i, j) => i.timestamp > j.timestamp, + ); + }); + + describe('remove()', () => { + it('should return false when no matching member present', () => { + const incoming = createPresenceMessage('client1', 'conn1', 'leave', 100); + assert.isFalse(presenceMap.remove(incoming)); + }); + + it('should return true when removing an (older) matching member', () => { + const original = createPresenceMessage('client1', 'conn1', 'present', 100); + presenceMap.put(original); + const incoming = createPresenceMessage('client1', 'conn1', 'leave', 150); + assert.isTrue(presenceMap.remove(incoming)); + }); + + it('should return false when trying to remove a newer matching member', () => { + const original = createPresenceMessage('client1', 'conn1', 'present', 100); + presenceMap.put(original); + const incoming = createPresenceMessage('client1', 'conn1', 'leave', 50); + assert.isFalse(presenceMap.remove(incoming)); + }); + }); + }); +}); From 3cb1699c09a75010a08cfb7ca945c69db8f43bca Mon Sep 17 00:00:00 2001 From: Simon Woolf Date: Wed, 13 Nov 2024 20:51:06 +0000 Subject: [PATCH 16/16] presenceEnterUpdate: fix flaky test --- test/realtime/presence.test.js | 40 +++++++++++----------------------- 1 file changed, 13 insertions(+), 27 deletions(-) diff --git a/test/realtime/presence.test.js b/test/realtime/presence.test.js index 2a7672e2b..231219ea3 100644 --- a/test/realtime/presence.test.js +++ b/test/realtime/presence.test.js @@ -568,34 +568,20 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async * * @spec RTP10 */ - it('presenceEnterAndLeave', function (done) { + it('presenceEnterAndLeave', async function () { const helper = this.test.helper; - var channelName = 'enterAndLeave'; - var enterAndLeave = function (cb) { - var clientRealtime = helper.AblyRealtime({ clientId: testClientId, tokenDetails: authToken }); - clientRealtime.connection.on('connected', function () { - /* get channel, attach, and enter */ - var clientChannel = clientRealtime.channels.get(channelName); - Helper.whenPromiseSettles(clientChannel.attach(), function (err) { - if (err) { - cb(err, clientRealtime); - return; - } - Helper.whenPromiseSettles(clientChannel.presence.enter('Test client data (leave0)'), function (err) { - if (err) { - cb(err, clientRealtime); - return; - } - }); - Helper.whenPromiseSettles(clientChannel.presence.leave(), function (err) { - cb(err, clientRealtime); - }); - }); - }); - helper.monitorConnection(done, clientRealtime); - }; - - runTestWithEventListener(done, helper, channelName, listenerFor('leave'), enterAndLeave); + const channelName = 'enterAndLeave'; + const clientRealtime = helper.AblyRealtime({ clientId: testClientId, tokenDetails: authToken }); + await clientRealtime.connection.whenState('connected'); + /* get channel, attach, and enter */ + const clientChannel = clientRealtime.channels.get(channelName); + await clientChannel.attach(); + await Promise.all([ + clientChannel.presence.enter('Test client data (leave0)'), + clientChannel.presence.subscriptions.once('enter'), + ]); + await Promise.all([clientChannel.presence.leave(), clientChannel.presence.subscriptions.once('leave')]); + clientRealtime.close(); }); /**