From b15a42627816825d17c703dcbadea0eb0c52cf5f Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Mon, 3 Jun 2024 19:23:40 -0700 Subject: [PATCH] feat(turbo-stream): delayedUnsubscribe option --- packages/turbo-stream/CHANGELOG.md | 2 + packages/turbo-stream/README.md | 13 ++++ packages/turbo-stream/index.d.ts | 3 + packages/turbo-stream/integration.test.ts | 74 ++++++++++++++++++- packages/turbo-stream/start.js | 4 + .../turbo-stream/stream_source_element.js | 11 ++- 6 files changed, 105 insertions(+), 2 deletions(-) diff --git a/packages/turbo-stream/CHANGELOG.md b/packages/turbo-stream/CHANGELOG.md index 6bd001f..c82e0e6 100644 --- a/packages/turbo-stream/CHANGELOG.md +++ b/packages/turbo-stream/CHANGELOG.md @@ -2,6 +2,8 @@ ## master +- Add `delayedUnsubscribe` option. ([@palkan][]) + ## 0.6.0 - Require `@anycable/core` ^0.9.0. diff --git a/packages/turbo-stream/README.md b/packages/turbo-stream/README.md index dd34e2d..421bf27 100644 --- a/packages/turbo-stream/README.md +++ b/packages/turbo-stream/README.md @@ -34,6 +34,19 @@ One subtle but important difference is that **`@anycable/turbo-stream` does not ## Advanced configuration +### Delayed unsubscribe + +When using Turbo Drive for navigation, it's common to have a stream source element attached to the same stream to appear in both old and new HTML. In order to avoid re-subscription to the underlying stream, we can keep the subscription during navigation by postponing the `unsubscribe` call (or more precisely, `channel.disconnect()`). Thus, we can avoid unnecessary Action Cable commands and avoid losing messages arrived in-between resubscription. You must opt-in to use this features: + +```js +import { start } from "@anycable/turbo-stream" +import cable from "cable" + +start(cable, { delayedUnsubscribe: true }) // default is 300ms + +start(cable, { delayedUnsubscribe: 1000 }) // Custom number of milliseconds +``` + ### Attaching `X-Socket-ID` header to Turbo requests You can automatically add a header to all Turbo requests with the current socket session ID. This can be used to perform **broadcasts to others** (see [Rails integration docs](https://docs.anycable.io/rails/getting_started?id=action-cable-extensions)): diff --git a/packages/turbo-stream/index.d.ts b/packages/turbo-stream/index.d.ts index c7f7cf0..0c116d3 100644 --- a/packages/turbo-stream/index.d.ts +++ b/packages/turbo-stream/index.d.ts @@ -13,6 +13,9 @@ export interface StartOptions { tagName: string channelClass: typeof TurboChannel requestSocketIDHeader: boolean | string + // Number of milliseconds to wait before unsubscribing from a channel after the element has been disconnected. + // If set to `true`, the default value (300ms) is used. + delayedUnsubscribe: boolean | number } export const DEFAULT_SOCKET_HEADER: string diff --git a/packages/turbo-stream/integration.test.ts b/packages/turbo-stream/integration.test.ts index ca25ca6..92f5491 100644 --- a/packages/turbo-stream/integration.test.ts +++ b/packages/turbo-stream/integration.test.ts @@ -71,6 +71,7 @@ const broadcast = (stream: string, message: string) => { } afterEach(() => { + document.documentElement.removeAttribute('data-turbo-preview') wss.close() for (let ws of wss.clients) { ws.terminate() @@ -128,7 +129,9 @@ describe('', () => { ` let cable = createCable(`ws://localhost:${port}`) - start(cable, { tagName: 'turbo-cable-stream-source-connected' }) + start(cable, { + tagName: 'turbo-cable-stream-source-connected' + }) let channel = cable.hub.channels[0] expect(channel).not.toBeUndefined() @@ -231,4 +234,73 @@ describe('', () => { (event.detail.fetchOptions.headers as any)['X-TURBO-SOCKET'] ).toEqual('42') }) + + it('with delayedUnsubscribe', async () => { + document.body.innerHTML = ` + + ` + + let cable = createCable(`ws://localhost:${port}`) + start(cable, { + tagName: 'turbo-cable-stream-source-delayed', + delayedUnsubscribe: true + }) + + let channel = cable.hub.channels[0] + expect(channel).not.toBeUndefined() + + await new Promise((resolve, reject) => { + setTimeout(() => { + reject(Error('no connect received')) + }, 2000) + channel.on('connect', resolve) + }) + + await new Promise(resolve => setTimeout(resolve, 300)) + + expect( + document.body + .querySelector('turbo-cable-stream-source-delayed') + ?.getAttribute('connected') + ).toEqual('') + + // One subscribe command was sent + expect(receivedByServer).toHaveLength(1) + + document.body.innerHTML = `
` + + await new Promise(resolve => setTimeout(resolve, 100)) + + document.body.innerHTML = ` + + ` + + await new Promise(resolve => setTimeout(resolve, 100)) + + // No new commands were sent to the server + expect(receivedByServer).toHaveLength(1) + + expect( + document.body + .querySelector('turbo-cable-stream-source-delayed') + ?.getAttribute('connected') + ).toEqual('') + + expect(cable.hub.channels).toHaveLength(2) + channel = cable.hub.channels[1] + + document.body.innerHTML = `
` + + await new Promise((resolve, reject) => { + setTimeout(() => { + reject(Error('no close received')) + }, 1000) + channel.on('close', resolve) + }) + + await new Promise(resolve => setTimeout(resolve, 500)) + + expect(receivedByServer).toHaveLength(2) + expect((receivedByServer[1] as any).command).toEqual('unsubscribe') + }) }) diff --git a/packages/turbo-stream/start.js b/packages/turbo-stream/start.js index 8f938ae..1b9e147 100644 --- a/packages/turbo-stream/start.js +++ b/packages/turbo-stream/start.js @@ -6,11 +6,15 @@ export const DEFAULT_SOCKET_HEADER = 'X-Socket-ID' export function start(cable, opts = {}) { let tagName = opts.tagName || 'turbo-cable-stream-source' let channelClass = opts.channelClass || TurboChannel + let delayedUnsubscribe = opts.delayedUnsubscribe || 0 + + if (delayedUnsubscribe === true) delayedUnsubscribe = 300 let C = class extends TurboStreamSourceElement {} C.cable = cable C.channelClass = channelClass + C.delayedUnsubscribe = delayedUnsubscribe if (customElements.get(tagName) === undefined) { customElements.define(tagName, C) diff --git a/packages/turbo-stream/stream_source_element.js b/packages/turbo-stream/stream_source_element.js index 03e3fc9..903e5a1 100644 --- a/packages/turbo-stream/stream_source_element.js +++ b/packages/turbo-stream/stream_source_element.js @@ -6,6 +6,7 @@ import { isPreview } from './turbo.js' export class TurboStreamSourceElement extends HTMLElement { static cable static channelClass + static delayedUnsubscribe async connectedCallback() { connectStreamSource(this) @@ -48,7 +49,15 @@ export class TurboStreamSourceElement extends HTMLElement { listener() } this.listeners.length = 0 - this.channel.disconnect() + + let ch = this.channel + let delay = this.constructor.delayedUnsubscribe + + if (delay) { + setTimeout(() => ch.disconnect(), delay) + } else { + ch.disconnect() + } } }