From b03e35230052ae672ba6eaa106f0f607d05fea97 Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Fri, 20 Dec 2024 15:19:50 -0800 Subject: [PATCH] feat: finalize presence --- README.md | 64 +++++++++++++++- packages/core/action_cable_ext/index.js | 44 ++++------- packages/core/action_cable_ext/index.test.ts | 80 +++++++++++++++++--- packages/core/cable/index.test.ts | 34 ++++++++- packages/core/channel/errors.ts | 4 +- packages/core/channel/index.d.ts | 6 +- packages/core/channel/index.test.ts | 78 +++++++++++++++---- packages/core/channel/presence.d.ts | 21 +++-- packages/core/channel/presence.js | 41 +++++----- packages/core/hub/index.test.ts | 21 +++++ 10 files changed, 303 insertions(+), 90 deletions(-) diff --git a/README.md b/README.md index ba8a611..939ee1c 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ createCable('ws://cable.example.com/my_cable') ### Pub/Sub > [!IMPORTANT] -> This feature is backed by AnyCable _signed streams_ (available since v1.5). See the [documentation](https://docs.anycable.io/edge/anycable-go/signed_streams). +> This feature is backed by AnyCable _signed streams_ (available since v1.5). See the [documentation](https://docs.anycable.io/anycable-go/signed_streams). You can subscribe directly to data streams as follows: @@ -79,6 +79,68 @@ const chatChannel = cable.streamFromSigned(signedName); // ... ``` +### Presence tracking + +> [!IMPORTANT] +> This feature is currently supported only by [AnyCable+](https://plus.anycable.io) and edge version of AnyCable server. See the [documentation](https://docs.anycable.io/edge/anycable-go/presence). + +You can keep track of the users currently connected to the channel. Let's assume you have the following channel: + +```js +const cable = createCable(); +const chatChannel = cable.streamFrom('room/42'); +``` + +To join the channel's presence set, you must explicitly provide the user's information: + +```js +// The first argument must be a unique user identifier within the channel +// and the second argument is an arbitrary user data (presence information) +chatChannel.presence.join(user.id, { name: user.name }) +``` + +You MUST join the presence once, no need to do that on every connection or reconnection—our library takes care of this. + +You can subscribe to presence events: + +```js +chatChannel.presence.on('presence', (ev) => { + const { type, info, id } = ev + + // Type could be 'join', 'leave', 'presence', or 'error' + if (type === 'join') { + console.log("user joined", id, info); + } + + if (type === 'leave') { + // no info, just id + console.log("user left", id); + } +}) +``` + +To obtain the current presence state, you can use the `info` function: + +```js +const users = await chatChannel.presence.info() + +// users is an object with user ids as keys and user data as values +users //=> { 'user-id': { name: 'John' }, ... } +``` + +Calling `presence.info()` performs a server request on the initial invocation (or when necessary) and uses `join` / `leave` events for keeping the +information up-to-date. + +Note that it's not necessary to join the channel to obtain the presence information. + +You can also leave the channel as follows: + +```js +chatChannel.presence.leave() +``` + +The users leaves the channel automatically on unsubscribe or disconnect (in this case, the presense state update might be delayed depending on the server-side configuration). + ### Channels AnyCable client provides multiple ways to subscribe to channels: class-based subscriptions and _headless_ subscriptions. diff --git a/packages/core/action_cable_ext/index.js b/packages/core/action_cable_ext/index.js index 29f609c..113f995 100644 --- a/packages/core/action_cable_ext/index.js +++ b/packages/core/action_cable_ext/index.js @@ -108,40 +108,24 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol { } if (type === 'presence') { - let pending = this.pendingPresence[identifier] + let presenceType = message.type - if (!pending) { - this.logger.warn('unexpected presence response', msg) - return - } - - delete this.pendingPresence[identifier] - - pending.resolve(message) - - return { - type: 'presence', - identifier, - message - } - } + if (presenceType === 'info') { + let pending = this.pendingPresence[identifier] - if (type === 'presence_error') { - let pending = this.pendingPresence[identifier] + if (pending) { + delete this.pendingPresence[identifier] + pending.resolve(message) + } + } else if (presenceType === 'error') { + let pending = this.pendingPresence[identifier] - if (!pending) { - this.logger.warn('unexpected presence response', msg) - return + if (pending) { + delete this.pendingPresence[identifier] + pending.reject(new Error('failed to retrieve presence')) + } } - delete this.pendingPresence[identifier] - - pending.reject(new Error('failed to retrieve presence')) - - return - } - - if (type === 'join' || type === 'leave') { return { type, identifier, @@ -276,7 +260,7 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol { presence(identifier, data) { if (this.pendingPresence[identifier]) { this.logger.warn('presence is already pending, skipping', identifier) - return Promise.reject(Error('Already requesting presence')) + return Promise.reject(Error('presence request is already pending')) } return new Promise((resolve, reject) => { diff --git a/packages/core/action_cable_ext/index.test.ts b/packages/core/action_cable_ext/index.test.ts index 18bcebd..7c77511 100644 --- a/packages/core/action_cable_ext/index.test.ts +++ b/packages/core/action_cable_ext/index.test.ts @@ -3,6 +3,7 @@ import { jest } from '@jest/globals' import { ActionCableExtendedProtocol } from '../index.js' import { TestConsumer } from '../protocol/testing' import { TestLogger } from '../logger/testing' +import { PresenceEvent } from '../channel/presence.js' let cable: TestConsumer let protocol: ActionCableExtendedProtocol @@ -363,27 +364,32 @@ describe('history', () => { describe('presence', () => { let identifier: string - let presenceState: any + let presenceState: PresenceEvent beforeEach(() => { logger.level = 'debug' identifier = JSON.stringify({ channel: 'TestChannel' }) presenceState = { + type: 'info', total: 0, records: [] } }) - it('request presence + success', () => { + it('request presence + success', async () => { let presencePromise = expect( protocol.perform(identifier, '$presence:info') ).resolves.toEqual(presenceState) + let doublePresencePromise = expect( + protocol.perform(identifier, '$presence:info') + ).rejects.toEqual(new Error('presence request is already pending')) + expect( protocol.receive({ type: 'presence', identifier, message: presenceState }) ).toEqual({ type: 'presence', identifier, message: presenceState }) - return presencePromise + await presencePromise }) it('request presence + failure', () => { @@ -392,8 +398,12 @@ describe('presence', () => { ).rejects.toEqual(new Error('failed to retrieve presence')) expect( - protocol.receive({ type: 'presence_error', identifier }) - ).toBeUndefined() + protocol.receive({ + type: 'presence', + identifier, + message: { type: 'error' } + }) + ).toEqual({ type: 'presence', identifier, message: { type: 'error' } }) return presencePromise }) @@ -426,11 +436,15 @@ describe('presence', () => { expect( protocol.receive({ - type: 'join', + type: 'presence', identifier, - message: { id: '42', info: 'vova' } + message: { type: 'join', id: '42', info: 'vova' } }) - ).toEqual({ type: 'join', identifier, message: { id: '42', info: 'vova' } }) + ).toEqual({ + type: 'presence', + identifier, + message: { type: 'join', id: '42', info: 'vova' } + }) }) it('leave', async () => { @@ -439,13 +453,20 @@ describe('presence', () => { expect(cable.mailbox).toHaveLength(1) expect(cable.mailbox[0]).toMatchObject({ command: 'leave', - identifier: 'test_id', - presence: { id: '42' } + identifier: 'test_id' }) expect( - protocol.receive({ type: 'leave', identifier, message: { id: '42' } }) - ).toEqual({ type: 'leave', identifier, message: { id: '42' } }) + protocol.receive({ + type: 'presence', + identifier, + message: { type: 'leave', id: '42' } + }) + ).toEqual({ + type: 'presence', + identifier, + message: { type: 'leave', id: '42' } + }) }) it('restore + presence', async () => { @@ -522,4 +543,39 @@ describe('presence', () => { } }) }) + + it('subscribe + presence + unsubscribe + subscribe', async () => { + identifier = '{"channel":"TestChannel"}' + let subscribePromise = expect( + protocol.subscribe('TestChannel') + ).resolves.toEqual(identifier) + + expect(cable.mailbox).toHaveLength(1) + expect(cable.mailbox[0]).toEqual({ command: 'subscribe', identifier }) + protocol.receive({ type: 'confirm_subscription', identifier }) + await subscribePromise + + await protocol.perform(identifier, '$presence:join', { + id: '42', + info: 'vova' + }) + expect(cable.mailbox).toHaveLength(2) + + await protocol.unsubscribe(identifier) + + cable.mailbox.length = 0 + + // wait for subscribe cooldown + await new Promise(resolve => setTimeout(resolve, 500)) + + let resubscribePromise = expect( + protocol.subscribe('TestChannel') + ).resolves.toEqual(identifier) + + expect(cable.mailbox).toHaveLength(1) + expect(cable.mailbox[0]).toEqual({ + command: 'subscribe', + identifier + }) + }) }) diff --git a/packages/core/cable/index.test.ts b/packages/core/cable/index.test.ts index d24417d..53f3963 100644 --- a/packages/core/cable/index.test.ts +++ b/packages/core/cable/index.test.ts @@ -68,11 +68,12 @@ class TestProtocol implements Protocol { } if (typeof msg === 'object') { - let data = msg as { identifier: string; payload: object } + let data = msg as { identifier: string; payload: object; type?: string } - let { identifier, payload } = data + let { identifier, payload, type } = data return { + type, identifier, message: payload, meta: { id: this.counter.toString() } @@ -1085,6 +1086,35 @@ describe('channels', () => { await promise }) + it('receive events', async () => { + await cable.subscribe(channel).ensureSubscribed() + + expect(cable.hub.size).toEqual(1) + expect(channel.state).toEqual('connected') + + let promise = new Promise((resolve, reject) => { + let tid = setTimeout(() => { + reject(Error('Timed out to receive message')) + }, 500) + + channel.on('info', (msg: InfoEvent) => { + clearTimeout(tid) + expect(msg.data).toEqual('hallo') + resolve() + }) + }) + + transport.receive( + JSON.stringify({ + identifier: 'test_26', + payload: { data: 'hallo' }, + type: 'info' + }) + ) + + await promise + }) + describe('closure and recovery with channels', () => { let channel2: TestChannel let firstError: Promise diff --git a/packages/core/channel/errors.ts b/packages/core/channel/errors.ts index 633cd92..949b13d 100644 --- a/packages/core/channel/errors.ts +++ b/packages/core/channel/errors.ts @@ -46,14 +46,14 @@ ch.on('message', (msg: object, meta: object) => { meta }) -// THROWS Argument of type '"data"' is not assignable to parameter of type 'keyof ChannelEvents' +// THROWS Argument of type '"data"' is not assignable to parameter of type 'keyof ChannelEvents' ch.on('data', (msg: object) => true) interface CustomEvents extends ChannelEvents<{ tupe: number }> { custom: () => void } -// THROWS Type 'CustomEvents' does not satisfy the constraint 'ChannelEvents<{ type: string; }>' +// THROWS Type 'CustomEvents' does not satisfy the constraint 'ChannelEvents<{ type: string; }, string | object>' export class TypedChannel extends Channel<{}, { type: string }, CustomEvents> {} interface ChannelActions { diff --git a/packages/core/channel/index.d.ts b/packages/core/channel/index.d.ts index 8ade6f9..7e45cc5 100644 --- a/packages/core/channel/index.d.ts +++ b/packages/core/channel/index.d.ts @@ -1,7 +1,7 @@ import { Unsubscribe } from 'nanoevents' import { ReasonError } from '../protocol/index.js' -import { Presence, PresenceEvent, PresenceInfo } from './presence.js' +import { Presence, PresenceEvent } from './presence.js' export type Identifier = string @@ -44,9 +44,7 @@ export interface ChannelEvents { close: (event?: ReasonError) => void message: (msg: T, meta?: MessageMeta) => void info: (event: InfoEvent) => void - join: (event: PresenceEvent

) => void - leave: (event: { id: string }) => void - presence: (event: PresenceInfo

) => void + presence: (event: PresenceEvent

) => void } /* eslint-disable @typescript-eslint/no-explicit-any */ diff --git a/packages/core/channel/index.test.ts b/packages/core/channel/index.test.ts index 6fb131f..27005bc 100644 --- a/packages/core/channel/index.test.ts +++ b/packages/core/channel/index.test.ts @@ -377,7 +377,7 @@ describe('receiver communicaton', () => { if (action === '$presence:join') { expect(payload).toMatchObject({ id: '42', info: 'foo' }) } else if (action === '$presence:leave') { - expect(payload).toEqual({ id: '42' }) + expect(payload).toBeUndefined() } else { throw new Error('Unexpected action') } @@ -386,6 +386,10 @@ describe('receiver communicaton', () => { } ) + await channel.presence.leave() + + await channel.presence.join('42', 'foo') + // double-join should be ignored await channel.presence.join('42', 'foo') await channel.presence.leave() }) @@ -411,35 +415,81 @@ describe('receiver communicaton', () => { let info = await channel.presence.info() expect(info).toEqual({ '42': 'foo' }) - let info2 = await channel.presence.info() + info = await channel.presence.info() - expect(info2).toEqual({ '42': 'foo' }) + expect(info).toEqual({ '42': 'foo' }) expect(spy).toHaveBeenCalledTimes(1) // check that leave/join updates the state - channel.emit('join', { id: '44', info: 'bar' }) + channel.emit('presence', { type: 'join', id: '44', info: 'bar' }) - let info3 = await channel.presence.info() - expect(info3).toEqual({ '42': 'foo', '44': 'bar' }) + info = await channel.presence.info() + expect(info).toEqual({ '42': 'foo', '44': 'bar' }) - channel.emit('leave', { id: '42' }) + channel.emit('presence', { type: 'leave', id: '42' }) - let info4 = await channel.presence.info() - expect(info4).toEqual({ '44': 'bar' }) + info = await channel.presence.info() + expect(info).toEqual({ '44': 'bar' }) expect(spy).toHaveBeenCalledTimes(1) channel.presence.reset() - let info5 = await channel.presence.info() - expect(info5).toEqual({ '42': 'foo' }) + channel.emit('presence', { + type: 'info', + records: [{ id: '42', info: 'fu' }], + total: 1 + }) + + info = await channel.presence.info() + expect(info).toEqual({ '42': 'fu' }) + + expect(spy).toHaveBeenCalledTimes(1) + + channel.presence.dispose() + + info = await channel.presence.info() + expect(info).toEqual({ '42': 'foo' }) expect(spy).toHaveBeenCalledTimes(2) - channel.emit('join', { id: '48', info: 'baz' }) + channel.emit('presence', { type: 'join', id: '48', info: 'baz' }) + + info = await channel.presence.info() + expect(info).toEqual({ '42': 'foo', '48': 'baz' }) + }) + + it('info + join/leave + reset', async () => { + client.subscribed(channel) + + let resolver!: (value: any) => void + + let spy = jest + .spyOn(client, 'perform') + .mockImplementation( + (identifier: Identifier, action?: string, payload?: Message) => { + return new Promise(resolve => { + resolver = resolve + }) + } + ) + + let promise = channel.presence.info() + let promise2 = channel.presence.info() + + // Must be ignored while waiting for the response + channel.emit('presence', { type: 'join', id: '44', info: 'bar' }) + + resolver({ + total: 1, + records: [{ id: '42', info: 'foo' }] + }) + + let res = await promise + let res2 = await promise2 - let info6 = await channel.presence.info() - expect(info6).toEqual({ '42': 'foo', '48': 'baz' }) + expect(res).toEqual({ '42': 'foo' }) + expect(res2).toEqual({ '42': 'foo' }) }) }) }) diff --git a/packages/core/channel/presence.d.ts b/packages/core/channel/presence.d.ts index e9838f1..b1a2ec7 100644 --- a/packages/core/channel/presence.d.ts +++ b/packages/core/channel/presence.d.ts @@ -1,20 +1,31 @@ -export type PresenceEvent = { +export type PresenceChangeEvent = { + type?: 'join' | 'leave' id: string - info: T + info?: T } -export type PresenceInfo = { +export type PresenceInfoEvent = { + type: 'info' total: number - records: PresenceEvent[] + records: PresenceChangeEvent[] } +export type PresenceErrorEvent = { + type: 'error' +} + +export type PresenceEvent = + | PresenceChangeEvent + | PresenceInfoEvent + | PresenceErrorEvent + export type PresenceState = { [key: string]: T } export type PresenceChannel = { on(event: string, callback: (msg: PresenceEvent) => void): () => void - perform(action: string, data: object): Promise> + perform(action: string, data: object): Promise> } export class Presence { diff --git a/packages/core/channel/presence.js b/packages/core/channel/presence.js index 0c690a4..f1ba2d0 100644 --- a/packages/core/channel/presence.js +++ b/packages/core/channel/presence.js @@ -3,24 +3,25 @@ export class Presence { constructor(channel) { this.channel = channel this.listeners = [] - this.watching = false } watch() { - if (this.watching) return - - this.watching = true - this.listeners.push( - this.channel.on('join', msg => { - if (!this._state) return + this.channel.on('presence', msg => { + if (msg.type === 'info') { + if (!this._state) { + this._state = this.stateFromInfo(msg) + } + return + } - this._state[msg.id] = msg.info - }), - this.channel.on('leave', msg => { if (!this._state) return - delete this._state[msg.id] + if (msg.type === 'join') { + this._state[msg.id] = msg.info + } else if (msg.type === 'leave') { + delete this._state[msg.id] + } }) ) } @@ -37,8 +38,6 @@ export class Presence { this.listeners.forEach(listener => listener()) this.listeners.length = 0 - - this.watching = false } async join(id, info) { @@ -51,9 +50,7 @@ export class Presence { async leave() { if (!this._info) return undefined - let res = await this.channel.perform('$presence:leave', { - id: this._info.id - }) + let res = await this.channel.perform('$presence:leave') delete this._info @@ -78,14 +75,18 @@ export class Presence { try { let presence = await this.channel.perform('$presence:info', {}) - this._state = presence.records.reduce((acc, { id, info }) => { - acc[id] = info - return acc - }, {}) + this._state = this.stateFromInfo(presence) return this._state } finally { delete this._promise } } + + stateFromInfo(presence) { + return presence.records.reduce((acc, { id, info }) => { + acc[id] = info + return acc + }, {}) + } } diff --git a/packages/core/hub/index.test.ts b/packages/core/hub/index.test.ts index 39d6be6..3461a0d 100644 --- a/packages/core/hub/index.test.ts +++ b/packages/core/hub/index.test.ts @@ -1,4 +1,5 @@ import { CreateSubscriptionOptions } from '.' +import { InfoEvent } from '../channel' import { Hub, Channel, @@ -130,6 +131,26 @@ describe('hub', () => { expect(newChannel.mailbox).toHaveLength(0) }) + it('notify test', () => { + let event: InfoEvent | undefined + channel.on('info', (ev: InfoEvent) => { + event = ev + }) + + subscription = hub.subscriptions.create('a', createOptions()) + subscription.add(channel) + + hub.subscribe('a', 'A') + hub.subscribe('b', 'B') + + hub.notify('A', 'info', { type: 'test', data: 1 }) + hub.notify('a', 'info', { type: 'test', data: 1 }) + hub.notify('B', 'info', { type: 'test', data: 1 }) + + expect(event).toBeDefined() + expect(event).toEqual({ type: 'test', data: 1 }) + }) + it('close', () => { hub.transmit('A', 'hello', { id: '1' }) hub.transmit('B', 'goodbye', { id: '2' })