Skip to content

Commit

Permalink
feat: finalize presence
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Dec 20, 2024
1 parent 4b56667 commit b03e352
Show file tree
Hide file tree
Showing 10 changed files with 303 additions and 90 deletions.
64 changes: 63 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ createCable('ws://cable.example.com/my_cable')
### Pub/Sub

> [!IMPORTANT]
> This feature is backed by AnyCable _signed streams_ (available since v1.5). See the [documentation](https://docs.anycable.io/edge/anycable-go/signed_streams).
> This feature is backed by AnyCable _signed streams_ (available since v1.5). See the [documentation](https://docs.anycable.io/anycable-go/signed_streams).
You can subscribe directly to data streams as follows:

Expand All @@ -79,6 +79,68 @@ const chatChannel = cable.streamFromSigned(signedName);
// ...
```

### Presence tracking

> [!IMPORTANT]
> This feature is currently supported only by [AnyCable+](https://plus.anycable.io) and edge version of AnyCable server. See the [documentation](https://docs.anycable.io/edge/anycable-go/presence).
You can keep track of the users currently connected to the channel. Let's assume you have the following channel:

```js
const cable = createCable();
const chatChannel = cable.streamFrom('room/42');
```

To join the channel's presence set, you must explicitly provide the user's information:

```js
// The first argument must be a unique user identifier within the channel
// and the second argument is an arbitrary user data (presence information)
chatChannel.presence.join(user.id, { name: user.name })
```

You MUST join the presence once, no need to do that on every connection or reconnection—our library takes care of this.

You can subscribe to presence events:

```js
chatChannel.presence.on('presence', (ev) => {
const { type, info, id } = ev

// Type could be 'join', 'leave', 'presence', or 'error'
if (type === 'join') {
console.log("user joined", id, info);
}

if (type === 'leave') {
// no info, just id
console.log("user left", id);
}
})
```

To obtain the current presence state, you can use the `info` function:

```js
const users = await chatChannel.presence.info()

// users is an object with user ids as keys and user data as values
users //=> { 'user-id': { name: 'John' }, ... }
```

Calling `presence.info()` performs a server request on the initial invocation (or when necessary) and uses `join` / `leave` events for keeping the
information up-to-date.

Note that it's not necessary to join the channel to obtain the presence information.

You can also leave the channel as follows:

```js
chatChannel.presence.leave()
```

The users leaves the channel automatically on unsubscribe or disconnect (in this case, the presense state update might be delayed depending on the server-side configuration).

### Channels

AnyCable client provides multiple ways to subscribe to channels: class-based subscriptions and _headless_ subscriptions.
Expand Down
44 changes: 14 additions & 30 deletions packages/core/action_cable_ext/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,40 +108,24 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol {
}

if (type === 'presence') {
let pending = this.pendingPresence[identifier]
let presenceType = message.type

if (!pending) {
this.logger.warn('unexpected presence response', msg)
return
}

delete this.pendingPresence[identifier]

pending.resolve(message)

return {
type: 'presence',
identifier,
message
}
}
if (presenceType === 'info') {
let pending = this.pendingPresence[identifier]

if (type === 'presence_error') {
let pending = this.pendingPresence[identifier]
if (pending) {
delete this.pendingPresence[identifier]
pending.resolve(message)
}
} else if (presenceType === 'error') {
let pending = this.pendingPresence[identifier]

if (!pending) {
this.logger.warn('unexpected presence response', msg)
return
if (pending) {
delete this.pendingPresence[identifier]
pending.reject(new Error('failed to retrieve presence'))
}
}

delete this.pendingPresence[identifier]

pending.reject(new Error('failed to retrieve presence'))

return
}

if (type === 'join' || type === 'leave') {
return {
type,
identifier,
Expand Down Expand Up @@ -276,7 +260,7 @@ export class ActionCableExtendedProtocol extends ActionCableProtocol {
presence(identifier, data) {
if (this.pendingPresence[identifier]) {
this.logger.warn('presence is already pending, skipping', identifier)
return Promise.reject(Error('Already requesting presence'))
return Promise.reject(Error('presence request is already pending'))
}

return new Promise((resolve, reject) => {
Expand Down
80 changes: 68 additions & 12 deletions packages/core/action_cable_ext/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { jest } from '@jest/globals'
import { ActionCableExtendedProtocol } from '../index.js'
import { TestConsumer } from '../protocol/testing'
import { TestLogger } from '../logger/testing'
import { PresenceEvent } from '../channel/presence.js'

let cable: TestConsumer
let protocol: ActionCableExtendedProtocol
Expand Down Expand Up @@ -363,27 +364,32 @@ describe('history', () => {

describe('presence', () => {
let identifier: string
let presenceState: any
let presenceState: PresenceEvent<string>

beforeEach(() => {
logger.level = 'debug'
identifier = JSON.stringify({ channel: 'TestChannel' })
presenceState = {
type: 'info',
total: 0,
records: []
}
})

it('request presence + success', () => {
it('request presence + success', async () => {
let presencePromise = expect(
protocol.perform(identifier, '$presence:info')
).resolves.toEqual(presenceState)

let doublePresencePromise = expect(
protocol.perform(identifier, '$presence:info')
).rejects.toEqual(new Error('presence request is already pending'))

expect(
protocol.receive({ type: 'presence', identifier, message: presenceState })
).toEqual({ type: 'presence', identifier, message: presenceState })

return presencePromise
await presencePromise
})

it('request presence + failure', () => {
Expand All @@ -392,8 +398,12 @@ describe('presence', () => {
).rejects.toEqual(new Error('failed to retrieve presence'))

expect(
protocol.receive({ type: 'presence_error', identifier })
).toBeUndefined()
protocol.receive({
type: 'presence',
identifier,
message: { type: 'error' }
})
).toEqual({ type: 'presence', identifier, message: { type: 'error' } })

return presencePromise
})
Expand Down Expand Up @@ -426,11 +436,15 @@ describe('presence', () => {

expect(
protocol.receive({
type: 'join',
type: 'presence',
identifier,
message: { id: '42', info: 'vova' }
message: { type: 'join', id: '42', info: 'vova' }
})
).toEqual({ type: 'join', identifier, message: { id: '42', info: 'vova' } })
).toEqual({
type: 'presence',
identifier,
message: { type: 'join', id: '42', info: 'vova' }
})
})

it('leave', async () => {
Expand All @@ -439,13 +453,20 @@ describe('presence', () => {
expect(cable.mailbox).toHaveLength(1)
expect(cable.mailbox[0]).toMatchObject({
command: 'leave',
identifier: 'test_id',
presence: { id: '42' }
identifier: 'test_id'
})

expect(
protocol.receive({ type: 'leave', identifier, message: { id: '42' } })
).toEqual({ type: 'leave', identifier, message: { id: '42' } })
protocol.receive({
type: 'presence',
identifier,
message: { type: 'leave', id: '42' }
})
).toEqual({
type: 'presence',
identifier,
message: { type: 'leave', id: '42' }
})
})

it('restore + presence', async () => {
Expand Down Expand Up @@ -522,4 +543,39 @@ describe('presence', () => {
}
})
})

it('subscribe + presence + unsubscribe + subscribe', async () => {
identifier = '{"channel":"TestChannel"}'
let subscribePromise = expect(
protocol.subscribe('TestChannel')
).resolves.toEqual(identifier)

expect(cable.mailbox).toHaveLength(1)
expect(cable.mailbox[0]).toEqual({ command: 'subscribe', identifier })
protocol.receive({ type: 'confirm_subscription', identifier })
await subscribePromise

await protocol.perform(identifier, '$presence:join', {
id: '42',
info: 'vova'
})
expect(cable.mailbox).toHaveLength(2)

await protocol.unsubscribe(identifier)

cable.mailbox.length = 0

// wait for subscribe cooldown
await new Promise(resolve => setTimeout(resolve, 500))

let resubscribePromise = expect(
protocol.subscribe('TestChannel')
).resolves.toEqual(identifier)

expect(cable.mailbox).toHaveLength(1)
expect(cable.mailbox[0]).toEqual({
command: 'subscribe',
identifier
})
})
})
34 changes: 32 additions & 2 deletions packages/core/cable/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,12 @@ class TestProtocol implements Protocol {
}

if (typeof msg === 'object') {
let data = msg as { identifier: string; payload: object }
let data = msg as { identifier: string; payload: object; type?: string }

let { identifier, payload } = data
let { identifier, payload, type } = data

return {
type,
identifier,
message: payload,
meta: { id: this.counter.toString() }
Expand Down Expand Up @@ -1085,6 +1086,35 @@ describe('channels', () => {
await promise
})

it('receive events', async () => {
await cable.subscribe(channel).ensureSubscribed()

expect(cable.hub.size).toEqual(1)
expect(channel.state).toEqual('connected')

let promise = new Promise<void>((resolve, reject) => {
let tid = setTimeout(() => {
reject(Error('Timed out to receive message'))
}, 500)

channel.on('info', (msg: InfoEvent) => {
clearTimeout(tid)
expect(msg.data).toEqual('hallo')
resolve()
})
})

transport.receive(
JSON.stringify({
identifier: 'test_26',
payload: { data: 'hallo' },
type: 'info'
})
)

await promise
})

describe('closure and recovery with channels', () => {
let channel2: TestChannel
let firstError: Promise<void>
Expand Down
4 changes: 2 additions & 2 deletions packages/core/channel/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,14 @@ ch.on('message', (msg: object, meta: object) => {
meta
})

// THROWS Argument of type '"data"' is not assignable to parameter of type 'keyof ChannelEvents<Message>'
// THROWS Argument of type '"data"' is not assignable to parameter of type 'keyof ChannelEvents<Message, string | object>'
ch.on('data', (msg: object) => true)

interface CustomEvents extends ChannelEvents<{ tupe: number }> {
custom: () => void
}

// THROWS Type 'CustomEvents' does not satisfy the constraint 'ChannelEvents<{ type: string; }>'
// THROWS Type 'CustomEvents' does not satisfy the constraint 'ChannelEvents<{ type: string; }, string | object>'
export class TypedChannel extends Channel<{}, { type: string }, CustomEvents> {}

interface ChannelActions {
Expand Down
6 changes: 2 additions & 4 deletions packages/core/channel/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Unsubscribe } from 'nanoevents'

import { ReasonError } from '../protocol/index.js'
import { Presence, PresenceEvent, PresenceInfo } from './presence.js'
import { Presence, PresenceEvent } from './presence.js'

export type Identifier = string

Expand Down Expand Up @@ -44,9 +44,7 @@ export interface ChannelEvents<T, P = object | string> {
close: (event?: ReasonError) => void
message: (msg: T, meta?: MessageMeta) => void
info: (event: InfoEvent) => void
join: (event: PresenceEvent<P>) => void
leave: (event: { id: string }) => void
presence: (event: PresenceInfo<P>) => void
presence: (event: PresenceEvent<P>) => void
}

/* eslint-disable @typescript-eslint/no-explicit-any */
Expand Down
Loading

0 comments on commit b03e352

Please sign in to comment.