diff --git a/.mocharc.js b/.mocharc.js index fb2c98119..a7201346a 100644 --- a/.mocharc.js +++ b/.mocharc.js @@ -9,7 +9,7 @@ const config = { // if you've defined specs in your config. therefore we work around it by only adding specs to the // config if none are passed as arguments if (!process.argv.slice(2).some(isTestFile)) { - config.spec = ['test/realtime/*.test.js', 'test/rest/*.test.js']; + config.spec = ['test/realtime/*.test.js', 'test/rest/*.test.js', 'test/unit/*.test.js']; } function isTestFile(arg) { diff --git a/CHANGELOG.md b/CHANGELOG.md index 39e1f7b36..6338fc79b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,17 @@ This contains only the most important and/or user-facing changes; for a full changelog, see the commit history. +## [2.5.0](https://github.com/ably/ably-js/tree/2.5.0) (2024-11-06) + +With this release, ably-js will now expose the new `Message` attributes needed to support upcoming features, +such as updates, deletions and annotations in the Ably service. + +- Added support for new `Message` attributes. [\#1888](https://github.com/ably/ably-js/pull/1888) + +## [2.4.1](https://github.com/ably/ably-js/tree/2.4.1) (2024-10-04) + +- Fix `usePresence` hook wasn't leaving presence if component unmounted during channel attaching state [\#1884](https://github.com/ably/ably-js/pull/1884) + ## [2.4.0](https://github.com/ably/ably-js/tree/2.4.0) (2024-09-11) - Add `wsConnectivityCheckUrl` client option [\#1862](https://github.com/ably/ably-js/pull/1862) diff --git a/Gruntfile.js b/Gruntfile.js index fdace117d..747c74d62 100644 --- a/Gruntfile.js +++ b/Gruntfile.js @@ -73,14 +73,7 @@ module.exports = function (grunt) { }); }); - grunt.registerTask('build', [ - 'checkGitSubmodules', - 'webpack:all', - 'build:browser', - 'build:node', - 'build:push', - 'build:liveobjects', - ]); + grunt.registerTask('build', ['webpack:all', 'build:browser', 'build:node', 'build:push', 'build:liveobjects']); grunt.registerTask('all', ['build', 'requirejs']); diff --git a/README.md b/README.md index 78a2b81a2..f69d00fd6 100644 --- a/README.md +++ b/README.md @@ -676,6 +676,26 @@ The issue is coming from the fact that when using App Router specifically depend Using `serverComponentsExternalPackages` opt-outs from using Next.js bundling for specific packages and uses native Node.js `require` instead. This is a common problem in App Router for a number of packages (for example, see next.js issue [vercel/next.js#52876](https://github.com/vercel/next.js/issues/52876)), and using `serverComponentsExternalPackages` is the recommended approach here. +#### "Connection limit exceeded" error during development + +If you're encountering a "Connection limit exceeded" error when trying to connect to Ably servers during the development of your application, and you notice spikes or linear increases in the connection count on the Ably dashboard for your app, this may be due to one of the following reasons: + +- If you're using Next.js, your `Ably.Realtime` client instance may be created multiple times on the server side (i.e., in a Node.js process) as you're developing your app, due to Next.js server side rendering your components. Note that even for "Client Components" (i.e., components with the 'use client' directive), [Next.js may still run the component code on the server in order to pre-render HTML](https://nextjs.org/docs/app/building-your-application/rendering/client-components#how-are-client-components-rendered). Depending on your client configuration options, those clients may also successfully open a connection to Ably servers from that Node.js process, which won't close until you restart your development server. + + The simplest fix is to use the `autoConnect` client option and check if the client is created on the server side with a simple window object check, like this: + + ```typescript + const client = new Ably.Realtime({ key: 'your-ably-api-key', autoConnect: typeof window !== 'undefined' }); + ``` + + This will prevent the client from connecting to Ably servers if it is created on the server side, while not affecting your client side components. + +- If you're using any React-based framework, you may be recreating the `Ably.Realtime` client instance on every component re-render. To avoid this, and to prevent potentially reaching the maximum connections limit on your account, move the client instantiation (`new Ably.Realtime`) outside of your components. You can find an example in our [React docs](./docs/react.md#Usage). + +- The connection limit error can be caused by the Hot Reloading mechanism of your development environment (called Fast Refresh in newer Next.js versions, or more generally, Hot Module Replacement - HMR). When you edit and save a file that contains a `new Ably.Realtime()` call in an environment that supports HMR (such as React, Vite, or Next.js apps), the file gets refreshed and creates a new `Ably.Realtime` client instance. However, the previous client remains in memory, unaware of the replacement, and stays connected to Ably's realtime systems. As a result, your connection count will keep increasing with each file edit as new clients are created. This only resets when you manually refresh the browser page, which closes all clients. This behavior applies to any development environment with an HMR mechanism implemented. + + The solution is simple: move the `new Ably.Realtime()` call to a separate file, such as `ably-client.js`, and export the client instance from there. This way, the client instance will only be recreated when you specifically make changes to the `ably-client.js` file, which should be far less frequent than changes in the rest of the codebase. + ## Contributing For guidance on how to contribute to this project, see the [CONTRIBUTING.md](CONTRIBUTING.md). diff --git a/ably.d.ts b/ably.d.ts index 5f375b1ec..5d1aa1f82 100644 --- a/ably.d.ts +++ b/ably.d.ts @@ -2483,7 +2483,6 @@ export declare interface Channels { * This experimental method allows you to create custom realtime data feeds by selectively subscribing * to receive only part of the data from the channel. * See the [announcement post](https://pages.ably.com/subscription-filters-preview) for more information. - * * @param name - The channel name. * @param deriveOptions - A {@link DeriveOptions} object. * @param channelOptions - A {@link ChannelOptions} object. @@ -2534,12 +2533,104 @@ export interface Message { * Timestamp of when the message was received by Ably, as milliseconds since the Unix epoch. */ timestamp?: number; + /** + * The action type of the message, one of the {@link MessageAction} enum values. + */ + action?: MessageAction; + /** + * This message's unique serial. + */ + serial?: string; + /** + * The serial of the message that this message is a reference to. + */ + refSerial?: string; + /** + * The type of reference this message is, in relation to the message it references. + */ + refType?: string; + /** + * If an `update` operation was applied to this message, this will be the timestamp the update occurred. + */ + updatedAt?: number; + /** + * The serial of the operation that updated this message. + */ + updateSerial?: string; + /** + * If this message resulted from an operation, this will contain the operation details. + */ + operation?: Operation; +} + +/** + * Contains the details of an operation, such as update or deletion, supplied by the actioning client. + */ +export interface Operation { + /** + * The client ID of the client that initiated the operation. + */ + clientId?: string; + /** + * The description provided by the client that initiated the operation. + */ + description?: string; + /** + * A JSON object of string key-value pairs that may contain metadata associated with the operation. + */ + metadata?: Record; +} + +/** + * The namespace containing the different types of message actions. + */ +declare namespace MessageActions { + /** + * Message action has not been set. + */ + type MESSAGE_UNSET = 'message.unset'; + /** + * Message action for a newly created message. + */ + type MESSAGE_CREATE = 'message.create'; + /** + * Message action for an updated message. + */ + type MESSAGE_UPDATE = 'message.update'; + /** + * Message action for a deleted message. + */ + type MESSAGE_DELETE = 'message.delete'; + /** + * Message action for a newly created annotation. + */ + type ANNOTATION_CREATE = 'annotation.create'; + /** + * Message action for a deleted annotation. + */ + type ANNOTATION_DELETE = 'annotation.delete'; + /** + * Message action for a meta-message that contains channel occupancy information. + */ + type META_OCCUPANCY = 'meta.occupancy'; } +/** + * Describes the possible action types used on an {@link Message}. + */ +export type MessageAction = + | MessageActions.MESSAGE_UNSET + | MessageActions.MESSAGE_CREATE + | MessageActions.MESSAGE_UPDATE + | MessageActions.MESSAGE_DELETE + | MessageActions.ANNOTATION_CREATE + | MessageActions.ANNOTATION_DELETE + | MessageActions.META_OCCUPANCY; + /** * A message received from Ably. */ -export type InboundMessage = Message & Required>; +export type InboundMessage = Message & Required>; /** * Static utilities related to messages. diff --git a/docs/react.md b/docs/react.md index bd0d32afa..1b2630e00 100644 --- a/docs/react.md +++ b/docs/react.md @@ -36,6 +36,7 @@ The hooks are compatible with all versions of React above 16.8.0 Start by connecting your app to Ably using the `AblyProvider` component. See the [`ClientOptions` documentation](https://ably.com/docs/api/realtime-sdk/types?lang=javascript) for information about what options are available when creating an Ably client. If you want to use the `usePresence` or `usePresenceListener` hooks, you'll need to explicitly provide a `clientId`. The `AblyProvider` should be high in your component tree, wrapping every component which needs to access Ably. +Also, ensure that the `Ably.Realtime` instance is created outside of components to prevent it from being recreated on component re-renders. This will help avoid opening extra unnecessary connections to the Ably servers and potentially reaching the maximum connections limit on your account. ```jsx import { AblyProvider } from 'ably/react'; diff --git a/package-lock.json b/package-lock.json index e6fa48ce9..2393a5194 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "ably", - "version": "2.4.0", + "version": "2.5.0", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "ably", - "version": "2.4.0", + "version": "2.5.0", "license": "Apache-2.0", "dependencies": { "@ably/msgpack-js": "^0.4.0", diff --git a/package.json b/package.json index b8f9ae029..6f89d1cfa 100644 --- a/package.json +++ b/package.json @@ -1,7 +1,7 @@ { "name": "ably", "description": "Realtime client library for Ably, the realtime messaging service", - "version": "2.4.0", + "version": "2.5.0", "license": "Apache-2.0", "bugs": { "url": "https://github.com/ably/ably-js/issues", @@ -146,6 +146,7 @@ "grunt": "grunt", "test": "npm run test:node", "test:node": "npm run build:node && npm run build:push && npm run build:liveobjects && mocha", + "test:grep": "npm run build:node && npm run build:push && npm run build:liveobjects && mocha --grep", "test:node:skip-build": "mocha", "test:webserver": "grunt test:webserver", "test:playwright": "node test/support/runPlaywrightTests.js", diff --git a/src/common/lib/client/defaultrealtime.ts b/src/common/lib/client/defaultrealtime.ts index 77e3eca41..77854711a 100644 --- a/src/common/lib/client/defaultrealtime.ts +++ b/src/common/lib/client/defaultrealtime.ts @@ -11,6 +11,7 @@ import RealtimePresence from './realtimepresence'; import { DefaultPresenceMessage } from '../types/defaultpresencemessage'; import WebSocketTransport from '../transport/websockettransport'; import { FilteredSubscriptions } from './filteredsubscriptions'; +import { PresenceMap } from './presencemap'; import { fromValues as presenceMessageFromValues, fromValuesArray as presenceMessagesFromValuesArray, @@ -69,4 +70,5 @@ export class DefaultRealtime extends BaseRealtime { // Used by tests static _Http = Http; + static _PresenceMap = PresenceMap; } diff --git a/src/common/lib/client/presencemap.ts b/src/common/lib/client/presencemap.ts new file mode 100644 index 000000000..793fde54b --- /dev/null +++ b/src/common/lib/client/presencemap.ts @@ -0,0 +1,205 @@ +import * as Utils from '../util/utils'; +import EventEmitter from '../util/eventemitter'; +import Logger from '../util/logger'; +import PresenceMessage, { fromValues as presenceMessageFromValues } from '../types/presencemessage'; + +import type RealtimePresence from './realtimepresence'; + +type compFn = (item: PresenceMessage, existing: PresenceMessage) => boolean; + +export interface RealtimePresenceParams { + waitForSync?: boolean; + clientId?: string; + connectionId?: string; +} + +function newerThan(item: PresenceMessage, existing: PresenceMessage): boolean { + /* RTP2b1: if either is synthesised, compare by timestamp */ + if (item.isSynthesized() || existing.isSynthesized()) { + // RTP2b1a: if equal, prefer the newly-arrived one + return (item.timestamp as number) >= (existing.timestamp as number); + } + + /* RTP2b2 */ + const itemOrderings = item.parseId(), + existingOrderings = existing.parseId(); + if (itemOrderings.msgSerial === existingOrderings.msgSerial) { + return itemOrderings.index > existingOrderings.index; + } else { + return itemOrderings.msgSerial > existingOrderings.msgSerial; + } +} + +export class PresenceMap extends EventEmitter { + map: Record; + residualMembers: Record | null; + syncInProgress: boolean; + presence: RealtimePresence; + memberKey: (item: PresenceMessage) => string; + newerThan: compFn; + + constructor(presence: RealtimePresence, memberKey: (item: PresenceMessage) => string, newer: compFn = newerThan) { + super(presence.logger); + this.presence = presence; + this.map = Object.create(null); + this.syncInProgress = false; + this.residualMembers = null; + this.memberKey = memberKey; + this.newerThan = newer; + } + + get(key: string) { + return this.map[key]; + } + + getClient(clientId: string) { + const map = this.map, + result = []; + for (const key in map) { + const item = map[key]; + if (item.clientId == clientId && item.action != 'absent') result.push(item); + } + return result; + } + + list(params: RealtimePresenceParams) { + const map = this.map, + clientId = params && params.clientId, + connectionId = params && params.connectionId, + result = []; + + for (const key in map) { + const item = map[key]; + if (item.action === 'absent') continue; + if (clientId && clientId != item.clientId) continue; + if (connectionId && connectionId != item.connectionId) continue; + result.push(item); + } + return result; + } + + put(item: PresenceMessage) { + if (item.action === 'enter' || item.action === 'update') { + item = presenceMessageFromValues(item); + item.action = 'present'; + } + const map = this.map, + key = this.memberKey(item); + /* we've seen this member, so do not remove it at the end of sync */ + if (this.residualMembers) delete this.residualMembers[key]; + + /* compare the timestamp of the new item with any existing member (or ABSENT witness) */ + const existingItem = map[key]; + if (existingItem && !this.newerThan(item, existingItem)) { + return false; + } + map[key] = item; + return true; + } + + values() { + const map = this.map, + result = []; + for (const key in map) { + const item = map[key]; + if (item.action != 'absent') result.push(item); + } + return result; + } + + remove(item: PresenceMessage) { + const map = this.map, + key = this.memberKey(item); + const existingItem = map[key]; + + if (existingItem && !this.newerThan(item, existingItem)) { + return false; + } + + /* RTP2f */ + if (this.syncInProgress) { + item = presenceMessageFromValues(item); + item.action = 'absent'; + map[key] = item; + } else { + delete map[key]; + } + + return !!existingItem; + } + + startSync() { + const map = this.map, + syncInProgress = this.syncInProgress; + Logger.logAction( + this.logger, + Logger.LOG_MINOR, + 'PresenceMap.startSync()', + 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, + ); + /* we might be called multiple times while a sync is in progress */ + if (!this.syncInProgress) { + this.residualMembers = Utils.copy(map); + this.setInProgress(true); + } + } + + endSync() { + const map = this.map, + syncInProgress = this.syncInProgress; + Logger.logAction( + this.logger, + Logger.LOG_MINOR, + 'PresenceMap.endSync()', + 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, + ); + if (syncInProgress) { + /* we can now strip out the ABSENT members, as we have + * received all of the out-of-order sync messages */ + for (const memberKey in map) { + const entry = map[memberKey]; + if (entry.action === 'absent') { + delete map[memberKey]; + } + } + /* any members that were present at the start of the sync, + * and have not been seen in sync, can be removed, and leave events emitted */ + this.presence._synthesizeLeaves(Utils.valuesArray(this.residualMembers as Record)); + for (const memberKey in this.residualMembers) { + delete map[memberKey]; + } + this.residualMembers = null; + + /* finish, notifying any waiters */ + this.setInProgress(false); + } + this.emit('sync'); + } + + waitSync(callback: () => void) { + const syncInProgress = this.syncInProgress; + Logger.logAction( + this.logger, + Logger.LOG_MINOR, + 'PresenceMap.waitSync()', + 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, + ); + if (!syncInProgress) { + callback(); + return; + } + this.once('sync', callback); + } + + clear() { + this.map = {}; + this.setInProgress(false); + this.residualMembers = null; + } + + setInProgress(inProgress: boolean) { + Logger.logAction(this.logger, Logger.LOG_MICRO, 'PresenceMap.setInProgress()', 'inProgress = ' + inProgress); + this.syncInProgress = inProgress; + this.presence.syncComplete = !inProgress; + } +} diff --git a/src/common/lib/client/realtimechannel.ts b/src/common/lib/client/realtimechannel.ts index 4cdcd2312..1b75fc06e 100644 --- a/src/common/lib/client/realtimechannel.ts +++ b/src/common/lib/client/realtimechannel.ts @@ -589,34 +589,17 @@ class RealtimeChannel extends EventEmitter { if (!message.presence) break; // eslint-disable-next-line no-fallthrough case actions.PRESENCE: { - const presence = message.presence; + const presenceMessages = message.presence; - if (!presence) { + if (!presenceMessages) { break; } - const { id, connectionId, timestamp } = message; - const options = this.channelOptions; - let presenceMsg: PresenceMessage; - for (let i = 0; i < presence.length; i++) { - try { - presenceMsg = presence[i]; - await decodePresenceMessage(presenceMsg, options); - if (!presenceMsg.connectionId) presenceMsg.connectionId = connectionId; - if (!presenceMsg.timestamp) presenceMsg.timestamp = timestamp; - if (!presenceMsg.id) presenceMsg.id = id + ':' + i; - } catch (e) { - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - } - } + await this._decodeAndPrepareMessages(message, presenceMessages, (msg) => decodePresenceMessage(msg, options)); + if (this._presence) { - this._presence.setPresence(presence, isSync, syncChannelSerial as any); + this._presence.setPresence(presenceMessages, isSync, syncChannelSerial as any); } break; } @@ -707,10 +690,7 @@ class RealtimeChannel extends EventEmitter { const messages = message.messages as Array, firstMessage = messages[0], - lastMessage = messages[messages.length - 1], - id = message.id, - connectionId = message.connectionId, - timestamp = message.timestamp; + lastMessage = messages[messages.length - 1]; if ( firstMessage.extras && @@ -728,36 +708,37 @@ class RealtimeChannel extends EventEmitter { break; } - for (let i = 0; i < messages.length; i++) { - const msg = messages[i]; - try { - await decodeMessage(msg, this._decodingContext); - } catch (e) { + const { unrecoverableError } = await this._decodeAndPrepareMessages( + message, + messages, + (msg) => decodeMessage(msg, this._decodingContext), + (e) => { /* decrypt failed .. the most likely cause is that we have the wrong key */ - Logger.logAction( - this.logger, - Logger.LOG_ERROR, - 'RealtimeChannel.processMessage()', - (e as Error).toString(), - ); - switch ((e as ErrorInfo).code) { + const errorInfo = e as ErrorInfo; + + switch (errorInfo.code) { case 40018: /* decode failure */ - this._startDecodeFailureRecovery(e as ErrorInfo); - return; + this._startDecodeFailureRecovery(errorInfo); + return { unrecoverableError: true }; + case 40019: /* No vcdiff plugin passed in - no point recovering, give up */ // eslint-disable-next-line no-fallthrough case 40021: /* Browser does not support deltas, similarly no point recovering */ - this.notifyState('failed', e as ErrorInfo); - return; + this.notifyState('failed', errorInfo); + return { unrecoverableError: true }; + + default: + return { unrecoverableError: false }; } - } - if (!msg.connectionId) msg.connectionId = connectionId; - if (!msg.timestamp) msg.timestamp = timestamp; - if (!msg.id) msg.id = id + ':' + i; + }, + ); + if (unrecoverableError) { + return; } + this._lastPayload.messageId = lastMessage.id; this._lastPayload.protocolMessageChannelSerial = message.channelSerial; this.onEvent(messages); @@ -787,6 +768,51 @@ class RealtimeChannel extends EventEmitter { } } + /** + * Mutates provided messages by adding `connectionId`, `timestamp` and `id` fields, and decoding message data. + * + * @returns `unrecoverableError` flag. If `true` indicates that unrecoverable error was encountered during message decoding + * and any further message processing should be stopped. Always equals to `false` if `decodeErrorRecoveryHandler` was not provided + */ + private async _decodeAndPrepareMessages( + protocolMessage: ProtocolMessage, + messages: T[], + decodeFn: (msg: T) => Promise, + decodeErrorRecoveryHandler?: (e: Error) => { unrecoverableError: boolean }, + ): Promise<{ unrecoverableError: boolean }> { + const { id, connectionId, timestamp } = protocolMessage; + + for (let i = 0; i < messages.length; i++) { + const msg = messages[i]; + + try { + // decode underlying data for a message + await decodeFn(msg); + } catch (e) { + Logger.logAction( + this.logger, + Logger.LOG_ERROR, + 'RealtimeChannel.decodeAndPrepareMessages()', + (e as Error).toString(), + ); + + if (decodeErrorRecoveryHandler) { + const { unrecoverableError } = decodeErrorRecoveryHandler(e as Error); + if (unrecoverableError) { + // break out of for loop by returning + return { unrecoverableError: true }; + } + } + } + + if (!msg.connectionId) msg.connectionId = connectionId; + if (!msg.timestamp) msg.timestamp = timestamp; + if (!msg.id) msg.id = id + ':' + i; + } + + return { unrecoverableError: false }; + } + _startDecodeFailureRecovery(reason: ErrorInfo): void { if (!this._lastPayload.decodeFailureRecoveryInProgress) { Logger.logAction( diff --git a/src/common/lib/client/realtimepresence.ts b/src/common/lib/client/realtimepresence.ts index ef02e8e78..64ad344f6 100644 --- a/src/common/lib/client/realtimepresence.ts +++ b/src/common/lib/client/realtimepresence.ts @@ -13,12 +13,7 @@ import ChannelStateChange from './channelstatechange'; import { CipherOptions } from '../types/message'; import { ErrCallback } from '../../types/utils'; import { PaginatedResult } from './paginatedresource'; - -interface RealtimePresenceParams { - waitForSync?: boolean; - clientId?: string; - connectionId?: string; -} +import { PresenceMap, RealtimePresenceParams } from './presencemap'; interface RealtimeHistoryParams { start?: number; @@ -64,23 +59,6 @@ function waitAttached(channel: RealtimeChannel, callback: ErrCallback, action: ( } } -function newerThan(item: PresenceMessage, existing: PresenceMessage) { - /* RTP2b1: if either is synthesised, compare by timestamp */ - if (item.isSynthesized() || existing.isSynthesized()) { - // RTP2b1a: if equal, prefer the newly-arrived one - return (item.timestamp as number) >= (existing.timestamp as number); - } - - /* RTP2b2 */ - const itemOrderings = item.parseId(), - existingOrderings = existing.parseId(); - if (itemOrderings.msgSerial === existingOrderings.msgSerial) { - return itemOrderings.index > existingOrderings.index; - } else { - return itemOrderings.msgSerial > existingOrderings.msgSerial; - } -} - class RealtimePresence extends EventEmitter { channel: RealtimeChannel; pendingPresence: { presence: PresenceMessage; callback: ErrCallback }[]; @@ -425,16 +403,8 @@ class RealtimePresence extends EventEmitter { } _ensureMyMembersPresent(): void { - const myMembers = this._myMembers, - reenterCb = (err?: ErrorInfo | null) => { - if (err) { - const msg = 'Presence auto-re-enter failed: ' + err.toString(); - const wrappedErr = new ErrorInfo(msg, 91004, 400); - Logger.logAction(this.logger, Logger.LOG_ERROR, 'RealtimePresence._ensureMyMembersPresent()', msg); - const change = new ChannelStateChange(this.channel.state, this.channel.state, true, false, wrappedErr); - this.channel.emit('update', change); - } - }; + const myMembers = this._myMembers; + const connId = this.channel.connectionManager.connectionId; for (const memberKey in myMembers.map) { const entry = myMembers.map[memberKey]; @@ -446,7 +416,19 @@ class RealtimePresence extends EventEmitter { ); // RTP17g: Send ENTER containing the member id, clientId and data // attributes. - Utils.whenPromiseSettles(this._enterOrUpdateClient(entry.id, entry.clientId, entry.data, 'enter'), reenterCb); + // RTP17g1: suppress id if the connId has changed + const id = entry.connectionId === connId ? entry.id : undefined; + this._enterOrUpdateClient(id, entry.clientId, entry.data, 'enter').catch((err) => { + const wrappedErr = new ErrorInfo('Presence auto re-enter failed', 91004, 400, err); + Logger.logAction( + this.logger, + Logger.LOG_ERROR, + 'RealtimePresence._ensureMyMembersPresent()', + 'Presence auto re-enter failed; reason = ' + Utils.inspectError(err), + ); + const change = new ChannelStateChange(this.channel.state, this.channel.state, true, false, wrappedErr); + this.channel.emit('update', change); + }); } } @@ -487,176 +469,4 @@ class RealtimePresence extends EventEmitter { } } -class PresenceMap extends EventEmitter { - map: Record; - residualMembers: Record | null; - syncInProgress: boolean; - presence: RealtimePresence; - memberKey: (item: PresenceMessage) => string; - - constructor(presence: RealtimePresence, memberKey: (item: PresenceMessage) => string) { - super(presence.logger); - this.presence = presence; - this.map = Object.create(null); - this.syncInProgress = false; - this.residualMembers = null; - this.memberKey = memberKey; - } - - get(key: string) { - return this.map[key]; - } - - getClient(clientId: string) { - const map = this.map, - result = []; - for (const key in map) { - const item = map[key]; - if (item.clientId == clientId && item.action != 'absent') result.push(item); - } - return result; - } - - list(params: RealtimePresenceParams) { - const map = this.map, - clientId = params && params.clientId, - connectionId = params && params.connectionId, - result = []; - - for (const key in map) { - const item = map[key]; - if (item.action === 'absent') continue; - if (clientId && clientId != item.clientId) continue; - if (connectionId && connectionId != item.connectionId) continue; - result.push(item); - } - return result; - } - - put(item: PresenceMessage) { - if (item.action === 'enter' || item.action === 'update') { - item = presenceMessageFromValues(item); - item.action = 'present'; - } - const map = this.map, - key = this.memberKey(item); - /* we've seen this member, so do not remove it at the end of sync */ - if (this.residualMembers) delete this.residualMembers[key]; - - /* compare the timestamp of the new item with any existing member (or ABSENT witness) */ - const existingItem = map[key]; - if (existingItem && !newerThan(item, existingItem)) { - return false; - } - map[key] = item; - return true; - } - - values() { - const map = this.map, - result = []; - for (const key in map) { - const item = map[key]; - if (item.action != 'absent') result.push(item); - } - return result; - } - - remove(item: PresenceMessage) { - const map = this.map, - key = this.memberKey(item); - const existingItem = map[key]; - - if (existingItem && !newerThan(item, existingItem)) { - return false; - } - - /* RTP2f */ - if (this.syncInProgress) { - item = presenceMessageFromValues(item); - item.action = 'absent'; - map[key] = item; - } else { - delete map[key]; - } - - return true; - } - - startSync() { - const map = this.map, - syncInProgress = this.syncInProgress; - Logger.logAction( - this.logger, - Logger.LOG_MINOR, - 'PresenceMap.startSync()', - 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, - ); - /* we might be called multiple times while a sync is in progress */ - if (!this.syncInProgress) { - this.residualMembers = Utils.copy(map); - this.setInProgress(true); - } - } - - endSync() { - const map = this.map, - syncInProgress = this.syncInProgress; - Logger.logAction( - this.logger, - Logger.LOG_MINOR, - 'PresenceMap.endSync()', - 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, - ); - if (syncInProgress) { - /* we can now strip out the ABSENT members, as we have - * received all of the out-of-order sync messages */ - for (const memberKey in map) { - const entry = map[memberKey]; - if (entry.action === 'absent') { - delete map[memberKey]; - } - } - /* any members that were present at the start of the sync, - * and have not been seen in sync, can be removed, and leave events emitted */ - this.presence._synthesizeLeaves(Utils.valuesArray(this.residualMembers as Record)); - for (const memberKey in this.residualMembers) { - delete map[memberKey]; - } - this.residualMembers = null; - - /* finish, notifying any waiters */ - this.setInProgress(false); - } - this.emit('sync'); - } - - waitSync(callback: () => void) { - const syncInProgress = this.syncInProgress; - Logger.logAction( - this.logger, - Logger.LOG_MINOR, - 'PresenceMap.waitSync()', - 'channel = ' + this.presence.channel.name + '; syncInProgress = ' + syncInProgress, - ); - if (!syncInProgress) { - callback(); - return; - } - this.once('sync', callback); - } - - clear() { - this.map = {}; - this.setInProgress(false); - this.residualMembers = null; - } - - setInProgress(inProgress: boolean) { - Logger.logAction(this.logger, Logger.LOG_MICRO, 'PresenceMap.setInProgress()', 'inProgress = ' + inProgress); - this.syncInProgress = inProgress; - this.presence.syncComplete = !inProgress; - } -} - export default RealtimePresence; diff --git a/src/common/lib/client/restchannel.ts b/src/common/lib/client/restchannel.ts index 3138e8611..e27133c6e 100644 --- a/src/common/lib/client/restchannel.ts +++ b/src/common/lib/client/restchannel.ts @@ -2,12 +2,12 @@ import * as Utils from '../util/utils'; import Logger from '../util/logger'; import RestPresence from './restpresence'; import Message, { - fromValues as messageFromValues, - fromValuesArray as messagesFromValuesArray, encodeArray as encodeMessagesArray, serialize as serializeMessage, getMessagesSize, CipherOptions, + fromValues as messageFromValues, + fromValuesArray as messagesFromValuesArray, } from '../types/message'; import ErrorInfo from '../types/errorinfo'; import { PaginatedResult } from './paginatedresource'; diff --git a/src/common/lib/types/defaultmessage.ts b/src/common/lib/types/defaultmessage.ts index dfc4a02b1..79fffccf5 100644 --- a/src/common/lib/types/defaultmessage.ts +++ b/src/common/lib/types/defaultmessage.ts @@ -1,10 +1,11 @@ import Message, { CipherOptions, - fromEncoded, - fromEncodedArray, - encode, decode, + encode, EncodingDecodingContext, + fromEncoded, + fromEncodedArray, + fromValues, } from './message'; import * as API from '../../../../ably'; import Platform from 'common/platform'; @@ -25,8 +26,8 @@ export class DefaultMessage extends Message { } // Used by tests - static fromValues(values: unknown): Message { - return Object.assign(new Message(), values); + static fromValues(values: Message | Record, options?: { stringifyAction?: boolean }): Message { + return fromValues(values, options); } // Used by tests diff --git a/src/common/lib/types/message.ts b/src/common/lib/types/message.ts index c2a1f1cdb..a1a449942 100644 --- a/src/common/lib/types/message.ts +++ b/src/common/lib/types/message.ts @@ -9,6 +9,30 @@ import * as API from '../../../../ably'; import { IUntypedCryptoStatic } from 'common/types/ICryptoStatic'; import { MsgPack } from 'common/types/msgpack'; +const MessageActionArray: API.MessageAction[] = [ + 'message.unset', + 'message.create', + 'message.update', + 'message.delete', + 'annotation.create', + 'annotation.delete', + 'meta.occupancy', +]; + +const MessageActionMap = new Map(MessageActionArray.map((action, index) => [action, index])); + +const ReverseMessageActionMap = new Map( + MessageActionArray.map((action, index) => [index, action]), +); + +function toMessageActionString(actionNumber: number): API.MessageAction | undefined { + return ReverseMessageActionMap.get(actionNumber); +} + +function toMessageActionNumber(messageAction?: API.MessageAction): number | undefined { + return messageAction ? MessageActionMap.get(messageAction) : undefined; +} + export type CipherOptions = { channelCipher: { encrypt: Function; @@ -82,7 +106,7 @@ export async function fromEncoded( encoded: unknown, inputOptions?: API.ChannelOptions, ): Promise { - const msg = fromValues(encoded); + const msg = fromValues(encoded as Message | Record, { stringifyAction: true }); const options = normalizeCipherOptions(Crypto, logger, inputOptions ?? null); /* if decoding fails at any point, catch and return the message decoded to * the fullest extent possible */ @@ -293,7 +317,7 @@ export async function fromResponseBody( } for (let i = 0; i < body.length; i++) { - const msg = (body[i] = fromValues(body[i])); + const msg = (body[i] = fromValues(body[i], { stringifyAction: true })); try { await decode(msg, options); } catch (e) { @@ -303,14 +327,22 @@ export async function fromResponseBody( return body; } -export function fromValues(values: unknown): Message { +export function fromValues( + values: Message | Record, + options?: { stringifyAction?: boolean }, +): Message { + const stringifyAction = options?.stringifyAction; + if (stringifyAction) { + const action = toMessageActionString(values.action as number) || values.action; + return Object.assign(new Message(), { ...values, action }); + } return Object.assign(new Message(), values); } export function fromValuesArray(values: unknown[]): Message[] { const count = values.length, result = new Array(count); - for (let i = 0; i < count; i++) result[i] = fromValues(values[i]); + for (let i = 0; i < count; i++) result[i] = fromValues(values[i] as Record); return result; } @@ -337,6 +369,13 @@ class Message { encoding?: string | null; extras?: any; size?: number; + action?: API.MessageAction | number; + serial?: string; + refSerial?: string; + refType?: string; + updatedAt?: number; + updateSerial?: string; + operation?: API.Operation; /** * Overload toJSON() to intercept JSON.stringify() @@ -367,6 +406,13 @@ class Message { connectionId: this.connectionId, connectionKey: this.connectionKey, extras: this.extras, + serial: this.serial, + action: toMessageActionNumber(this.action as API.MessageAction) || this.action, + refSerial: this.refSerial, + refType: this.refType, + updatedAt: this.updatedAt, + updateSerial: this.updateSerial, + operation: this.operation, encoding, data, }; @@ -388,6 +434,14 @@ class Message { else result += '; data (json)=' + JSON.stringify(this.data); } if (this.extras) result += '; extras=' + JSON.stringify(this.extras); + + if (this.action) result += '; action=' + this.action; + if (this.serial) result += '; serial=' + this.serial; + if (this.refSerial) result += '; refSerial=' + this.refSerial; + if (this.refType) result += '; refType=' + this.refType; + if (this.updatedAt) result += '; updatedAt=' + this.updatedAt; + if (this.updateSerial) result += '; updateSerial=' + this.updateSerial; + if (this.operation) result += '; operation=' + JSON.stringify(this.operation); result += ']'; return result; } diff --git a/src/common/lib/types/protocolmessage.ts b/src/common/lib/types/protocolmessage.ts index 74a1e64db..b0617cdf8 100644 --- a/src/common/lib/types/protocolmessage.ts +++ b/src/common/lib/types/protocolmessage.ts @@ -103,15 +103,24 @@ export function fromDeserialized( liveObjectsPlugin: typeof LiveObjectsPlugin | null, ): ProtocolMessage { const error = deserialized.error; - if (error) deserialized.error = ErrorInfo.fromValues(error as ErrorInfo); + if (error) { + deserialized.error = ErrorInfo.fromValues(error as ErrorInfo); + } + const messages = deserialized.messages as Message[]; - if (messages) for (let i = 0; i < messages.length; i++) messages[i] = messageFromValues(messages[i]); + if (messages) { + for (let i = 0; i < messages.length; i++) { + messages[i] = messageFromValues(messages[i], { stringifyAction: true }); + } + } const presence = presenceMessagePlugin ? (deserialized.presence as PresenceMessage[]) : undefined; if (presenceMessagePlugin) { - if (presence && presenceMessagePlugin) - for (let i = 0; i < presence.length; i++) + if (presence && presenceMessagePlugin) { + for (let i = 0; i < presence.length; i++) { presence[i] = presenceMessagePlugin.presenceMessageFromValues(presence[i], true); + } + } } let state: LiveObjectsPlugin.StateMessage[] | undefined = undefined; diff --git a/src/platform/react-hooks/src/AblyReactHooks.ts b/src/platform/react-hooks/src/AblyReactHooks.ts index 9874cd8f7..a4aa8b86f 100644 --- a/src/platform/react-hooks/src/AblyReactHooks.ts +++ b/src/platform/react-hooks/src/AblyReactHooks.ts @@ -12,7 +12,7 @@ export type ChannelNameAndOptions = { export type ChannelNameAndAblyId = Pick; export type ChannelParameters = string | ChannelNameAndOptions; -export const version = '2.4.0'; +export const version = '2.5.0'; export function channelOptionsWithAgent(options?: Ably.ChannelOptions) { return { diff --git a/src/platform/react-hooks/src/hooks/usePresence.ts b/src/platform/react-hooks/src/hooks/usePresence.ts index 42c7a6b6f..a322c4785 100644 --- a/src/platform/react-hooks/src/hooks/usePresence.ts +++ b/src/platform/react-hooks/src/hooks/usePresence.ts @@ -73,9 +73,11 @@ export function usePresence( return () => { // here we use the ably.connection.state property, which upon this cleanup function call // will have the current connection state for that connection, thanks to us accessing the Ably instance here by reference. - // if the connection is in one of the inactive states or the channel is not attached, a presence.leave call will produce an exception. - // so we only leave presence in other cases. - if (channel.state === 'attached' && !INACTIVE_CONNECTION_STATES.includes(ably.connection.state)) { + // if the connection is in one of the inactive states or the channel is not attached/attaching, a presence.leave call will produce an exception. + // so we should only leave presence in other cases. + const canLeaveFromConnectionState = !INACTIVE_CONNECTION_STATES.includes(ably.connection.state); + const canLeaveFromChannelState = ['attached', 'attaching'].includes(channel.state); + if (canLeaveFromChannelState && canLeaveFromConnectionState) { channel.presence.leave(); } }; diff --git a/test/common/modules/shared_helper.js b/test/common/modules/shared_helper.js index 38c791247..8cafaef28 100644 --- a/test/common/modules/shared_helper.js +++ b/test/common/modules/shared_helper.js @@ -265,7 +265,7 @@ define([ becomeSuspended(realtime, cb) { const helper = this.addingHelperFunction('becomeSuspended'); - helper._becomeSuspended(realtime, cb); + return helper._becomeSuspended(realtime, cb); } _becomeSuspended(realtime, cb) { @@ -276,10 +276,13 @@ define([ self.recordPrivateApi('call.connectionManager.notifyState'); realtime.connection.connectionManager.notifyState({ state: 'suspended' }); }); - if (cb) + if (cb) { realtime.connection.once('suspended', function () { cb(); }); + } else { + return realtime.connection.once('suspended'); + } } callbackOnClose(realtime, callback) { diff --git a/test/realtime/crypto.test.js b/test/realtime/crypto.test.js index 14cb65330..18df2f11e 100644 --- a/test/realtime/crypto.test.js +++ b/test/realtime/crypto.test.js @@ -395,6 +395,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('call.msgpack.decode'); var messageFromMsgpack = Message.fromValues( msgpack.decode(BufferUtils.base64Decode(msgpackEncodedMessage)), + { stringifyAction: true }, ); try { @@ -439,6 +440,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async helper.recordPrivateApi('call.msgpack.decode'); var messageFromMsgpack = Message.fromValues( msgpack.decode(BufferUtils.base64Decode(msgpackEncodedMessage)), + { stringifyAction: true }, ); try { diff --git a/test/realtime/message.test.js b/test/realtime/message.test.js index f56946a20..15389d7f3 100644 --- a/test/realtime/message.test.js +++ b/test/realtime/message.test.js @@ -4,6 +4,7 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async var expect = chai.expect; let config = Ably.Realtime.Platform.Config; var createPM = Ably.makeProtocolMessageFromDeserialized(); + var Message = Ably.Realtime.Message; var publishIntervalHelper = function (currentMessageNum, channel, dataFn, onPublish) { return function () { @@ -1271,6 +1272,56 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async channel.publish('end', null); }); }); + /** + * @spec TM2j + */ + describe('DefaultMessage.fromValues stringify action', function () { + const testCases = [ + { + description: 'should stringify the numeric action', + action: 1, + options: { stringifyAction: true }, + expectedString: '[Message; action=message.create]', + expectedJSON: { action: 1 }, + }, + { + description: 'should not stringify the numeric action', + action: 1, + options: { stringifyAction: false }, + expectedString: '[Message; action=1]', + expectedJSON: { action: 1 }, + }, + { + description: 'should accept an already stringified action', + action: 'message.update', + options: { stringifyAction: true }, + expectedString: '[Message; action=message.update]', + expectedJSON: { action: 2 }, + }, + { + description: 'should handle no action provided', + action: undefined, + options: { stringifyAction: true }, + expectedString: '[Message]', + expectedJSON: { action: undefined }, + }, + { + description: 'should handle unknown action provided', + action: 10, + options: { stringifyAction: true }, + expectedString: '[Message; action=10]', + expectedJSON: { action: 10 }, + }, + ]; + testCases.forEach(({ description, action, options, expectedString, expectedJSON }) => { + it(description, function () { + const values = { action }; + const message = Message.fromValues(values, options); + expect(message.toString()).to.equal(expectedString); + expect(message.toJSON()).to.deep.contains(expectedJSON); + }); + }); + }); /** * @spec RTS5 diff --git a/test/realtime/presence.test.js b/test/realtime/presence.test.js index 2be0762da..1f52bcbcc 100644 --- a/test/realtime/presence.test.js +++ b/test/realtime/presence.test.js @@ -1,7 +1,7 @@ 'use strict'; define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async, chai) { - var expect = chai.expect; + const { expect, assert } = chai; var createPM = Ably.makeProtocolMessageFromDeserialized(); var PresenceMessage = Ably.Realtime.PresenceMessage; @@ -568,34 +568,20 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async * * @spec RTP10 */ - it('presenceEnterAndLeave', function (done) { + it('presenceEnterAndLeave', async function () { const helper = this.test.helper; - var channelName = 'enterAndLeave'; - var enterAndLeave = function (cb) { - var clientRealtime = helper.AblyRealtime({ clientId: testClientId, tokenDetails: authToken }); - clientRealtime.connection.on('connected', function () { - /* get channel, attach, and enter */ - var clientChannel = clientRealtime.channels.get(channelName); - Helper.whenPromiseSettles(clientChannel.attach(), function (err) { - if (err) { - cb(err, clientRealtime); - return; - } - Helper.whenPromiseSettles(clientChannel.presence.enter('Test client data (leave0)'), function (err) { - if (err) { - cb(err, clientRealtime); - return; - } - }); - Helper.whenPromiseSettles(clientChannel.presence.leave(), function (err) { - cb(err, clientRealtime); - }); - }); - }); - helper.monitorConnection(done, clientRealtime); - }; - - runTestWithEventListener(done, helper, channelName, listenerFor('leave'), enterAndLeave); + const channelName = 'enterAndLeave'; + const clientRealtime = helper.AblyRealtime({ clientId: testClientId, tokenDetails: authToken }); + await clientRealtime.connection.whenState('connected'); + /* get channel, attach, and enter */ + const clientChannel = clientRealtime.channels.get(channelName); + await clientChannel.attach(); + await Promise.all([ + clientChannel.presence.enter('Test client data (leave0)'), + clientChannel.presence.subscriptions.once('enter'), + ]); + await Promise.all([clientChannel.presence.leave(), clientChannel.presence.subscriptions.once('leave')]); + clientRealtime.close(); }); /** @@ -1627,117 +1613,120 @@ define(['ably', 'shared_helper', 'async', 'chai'], function (Ably, Helper, async * @spec RTP17g * @specpartial RTP17i - tests simple re-entry, no RESUMED flag test */ - it('presence_auto_reenter', function (done) { + it('presence_auto_reenter', async function () { const helper = this.test.helper; - var channelName = 'presence_auto_reenter'; - var realtime = helper.AblyRealtime(); - var channel = realtime.channels.get(channelName); + const channelName = 'presence_auto_reenter'; + const realtime = helper.AblyRealtime(); + const channel = realtime.channels.get(channelName); + + await realtime.connection.once('connected'); + await channel.attach(); + // presence.get will wait for a sync if needed + await channel.presence.get(); + + const pOnPresence = channel.presence.subscriptions.once('enter'); + await channel.presence.enterClient('one', 'onedata'); + await pOnPresence; + + /* inject an additional member into the myMember set, then force a suspended state */ + helper.recordPrivateApi('read.connectionManager.connectionId'); + const connId = realtime.connection.connectionManager.connectionId; + + helper.recordPrivateApi('call.presence._myMembers.put'); + channel.presence._myMembers.put({ + action: 'enter', + clientId: 'two', + connectionId: connId, + id: connId + ':0:0', + data: 'twodata', + }); - async.series( - [ - function (cb) { - realtime.connection.once('connected', function () { - cb(); - }); - }, - function (cb) { - Helper.whenPromiseSettles(channel.attach(), cb); - }, - function (cb) { - if (!channel.presence.syncComplete) { - helper.recordPrivateApi('call.presence.waitSync'); - channel.presence.members.waitSync(cb); - } else { - cb(); - } - }, - function (cb) { - channel.presence.enterClient('one', 'onedata'); - channel.presence.subscribe('enter', function () { - channel.presence.unsubscribe('enter'); - cb(); - }); - }, - function (cb) { - /* inject an additional member into the myMember set, then force a suspended state */ - helper.recordPrivateApi('read.connectionManager.connectionId'); - var connId = realtime.connection.connectionManager.connectionId; - helper.recordPrivateApi('call.presence._myMembers.put'); - channel.presence._myMembers.put({ - action: 'enter', - clientId: 'two', - connectionId: connId, - id: connId + ':0:0', - data: 'twodata', - }); - helper.becomeSuspended(realtime, cb); - }, - function (cb) { + await helper.becomeSuspended(realtime); + + expect(channel.state).to.equal('suspended', 'sanity-check channel state'); + + /* Reconnect */ + const pOnceAttached = channel.once('attached'); + realtime.connection.connect(); + await pOnceAttached; + + /* Since we haven't been gone for two minutes, we don't know for sure + * that realtime will feel it necessary to do a sync - if it doesn't, + * we request one */ + if (channel.presence.syncComplete) { + helper.recordPrivateApi('call.channel.sync'); + channel.sync(); + } + await channel.presence.get(); + + /* Now just wait for an enter! */ + const enteredMembers = new Set(); + await new Promise((resolve, reject) => { + channel.presence.subscribe('enter', (presmsg) => { + enteredMembers.add(presmsg.clientId); + if (enteredMembers.size === 2) { try { - expect(channel.state).to.equal('suspended', 'sanity-check channel state'); + expect(enteredMembers.has('one')).to.equal(true, 'Check client one entered'); + expect(enteredMembers.has('two')).to.equal(true, 'Check client two entered'); + channel.presence.unsubscribe('enter'); + resolve(); } catch (err) { - cb(err); + reject(err); return; } - /* Reconnect */ - realtime.connection.connect(); - channel.once('attached', function () { - cb(); - }); - }, - function (cb) { - /* Since we haven't been gone for two minutes, we don't know for sure - * that realtime will feel it necessary to do a sync - if it doesn't, - * we request one */ - if (channel.presence.syncComplete) { - helper.recordPrivateApi('call.channel.sync'); - channel.sync(); - } - helper.recordPrivateApi('call.presence.waitSync'); - channel.presence.members.waitSync(cb); - }, - function (cb) { - /* Now just wait for an enter! */ - let enteredMembers = new Set(); - channel.presence.subscribe('enter', function (presmsg) { - enteredMembers.add(presmsg.clientId); - if (enteredMembers.size === 2) { - try { - expect(enteredMembers.has('one')).to.equal(true, 'Check client one entered'); - expect(enteredMembers.has('two')).to.equal(true, 'Check client two entered'); - channel.presence.unsubscribe('enter'); - cb(); - } catch (err) { - cb(err); - return; - } - } - }); - }, - function (cb) { - Helper.whenPromiseSettles(channel.presence.get(), function (err, results) { - if (err) { - cb(err); - return; - } - try { - expect(channel.presence.syncComplete, 'Check in sync').to.be.ok; - expect(results.length).to.equal(3, 'Check correct number of results'); - expect(extractClientIds(results)).deep.to.equal(['one', 'one', 'two'], 'check correct members'); - expect(extractMember(results, 'one').data).to.equal('onedata', 'check correct data on one'); - expect(extractMember(results, 'two').data).to.equal('twodata', 'check correct data on two'); - } catch (err) { - cb(err); - return; - } - cb(); - }); - }, - ], - function (err) { - helper.closeAndFinish(done, realtime, err); - }, - ); + } + }); + }); + + const results = await channel.presence.get(); + expect(channel.presence.syncComplete, 'Check in sync').to.be.ok; + expect(results.length).to.equal(3, 'Check correct number of results'); + expect(extractClientIds(results)).deep.to.equal(['one', 'one', 'two'], 'check correct members'); + expect(extractMember(results, 'one').data).to.equal('onedata', 'check correct data on one'); + expect(extractMember(results, 'two').data).to.equal('twodata', 'check correct data on two'); + realtime.close(); + }); + + /** + * Test the auto-re-enter functionality with a resume failure resulting in a different + * connectionId (the re-entry should not have a message id) + * + * @spec RTP17g + * @spec RTP17g1 + */ + it('presence_auto_reenter_different_connid', async function () { + const helper = this.test.helper; + const channelName = 'presence_auto_reenter_different_connid'; + const realtime = helper.AblyRealtime({ transportParams: { remainPresentFor: 5000 } }); + const channel = realtime.channels.get(channelName); + + await realtime.connection.once('connected'); + const firstConnId = realtime.connection.id; + await channel.attach(); + // presence.get will wait for a sync if needed + await channel.presence.get(); + + const pOnPresence = channel.presence.subscriptions.once('enter'); + await channel.presence.enterClient('one', 'onedata'); + const member1 = await pOnPresence; + + await helper.becomeSuspended(realtime); + assert.equal(channel.state, 'suspended', 'sanity-check channel state'); + + /* Reconnect. Since we were suspended, we will get a different connection id */ + const pOnceAttached = channel.once('attached'); + const pOnEnter = channel.presence.subscriptions.once('enter'); + const pOnLeave = channel.presence.subscriptions.once('leave'); + realtime.connection.connect(); + await pOnceAttached; + const secondConnId = realtime.connection.id; + assert.notEqual(firstConnId, secondConnId, 'sanity-check connection id changed post-suspend'); + const [enter, leave] = await Promise.all([pOnEnter, pOnLeave]); + assert.equal(leave.connectionId, firstConnId, 'Check the leave for the old connid'); + assert.equal(enter.connectionId, secondConnId, 'Check enter for new connid'); + assert.notEqual(enter.id, member1.id, 'Check the new enter did not have the msgId of the original'); + + realtime.close(); }); /** diff --git a/test/support/runPlaywrightTests.js b/test/support/runPlaywrightTests.js index edbcc1531..5ff2d9106 100644 --- a/test/support/runPlaywrightTests.js +++ b/test/support/runPlaywrightTests.js @@ -68,14 +68,20 @@ const runTests = async (browserType) => { // Use page.evaluate to add these functions as event listeners to the 'testLog' and 'testResult' Custom Events. // These events are fired by the custom mocha reporter in playwrightSetup.js - page.evaluate(() => { + const grep = process.env.MOCHA_GREP; + page.evaluate((grep) => { window.addEventListener('testLog', ({ type, detail }) => { onTestLog({ type, detail }); }); window.addEventListener('testResult', ({ type, detail }) => { onTestResult({ type, detail }); }); - }); + // Set grep pattern in the browser context + // allows easy filtering of tests. + if (grep) { + window.mocha.grep(new RegExp(grep)); + } + }, grep); }); }; diff --git a/test/unit/presencemap.test.js b/test/unit/presencemap.test.js new file mode 100644 index 000000000..713669dc4 --- /dev/null +++ b/test/unit/presencemap.test.js @@ -0,0 +1,50 @@ +'use strict'; + +define(['chai', 'ably'], function (chai, Ably) { + const { assert } = chai; + const PresenceMap = Ably.Realtime._PresenceMap; + + class MockRealtimePresence {} + + describe('PresenceMap', () => { + let presenceMap; + + // Helper function to create a presence message + const createPresenceMessage = (clientId, connectionId, action, timestamp) => ({ + clientId, + connectionId, + timestamp, + action, + }); + + beforeEach(() => { + // Initialize with a simple memberKey function that uses clientId as the key + presenceMap = new PresenceMap( + new MockRealtimePresence(), + (item) => item.clientId + ':' + item.connectionId, + (i, j) => i.timestamp > j.timestamp, + ); + }); + + describe('remove()', () => { + it('should return false when no matching member present', () => { + const incoming = createPresenceMessage('client1', 'conn1', 'leave', 100); + assert.isFalse(presenceMap.remove(incoming)); + }); + + it('should return true when removing an (older) matching member', () => { + const original = createPresenceMessage('client1', 'conn1', 'present', 100); + presenceMap.put(original); + const incoming = createPresenceMessage('client1', 'conn1', 'leave', 150); + assert.isTrue(presenceMap.remove(incoming)); + }); + + it('should return false when trying to remove a newer matching member', () => { + const original = createPresenceMessage('client1', 'conn1', 'present', 100); + presenceMap.put(original); + const incoming = createPresenceMessage('client1', 'conn1', 'leave', 50); + assert.isFalse(presenceMap.remove(incoming)); + }); + }); + }); +});