From ea0d209d4429c39dad27b13596122f1cb0299a13 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Sun, 16 Feb 2025 09:08:15 +0100 Subject: [PATCH 1/3] add text auto chunking to text streams --- examples/demo/demo.ts | 23 +++++++++++------------ src/room/StreamReader.ts | 23 ++++++++--------------- src/room/StreamWriter.ts | 8 ++++---- src/room/participant/LocalParticipant.ts | 19 ++++++------------- src/room/types.ts | 6 ------ src/room/utils.ts | 16 ++++++++++++++++ 6 files changed, 45 insertions(+), 50 deletions(-) diff --git a/examples/demo/demo.ts b/examples/demo/demo.ts index e4dbb5896d..d7982e5993 100644 --- a/examples/demo/demo.ts +++ b/examples/demo/demo.ts @@ -264,16 +264,15 @@ const appActions = { room.getParticipantByIdentity(participant?.identity), ); } else { - for await (const msg of reader) { - handleChatMessage( - { - id: info.id, - timestamp: info.timestamp, - message: msg.collected, - }, - room.getParticipantByIdentity(participant?.identity), - ); - } + handleChatMessage( + { + id: info.id, + timestamp: info.timestamp, + message: await reader.readAll(), + }, + room.getParticipantByIdentity(participant?.identity), + ); + appendLog('text stream finished'); } console.log('final info including close extensions', reader.info); @@ -515,7 +514,7 @@ const appActions = { if (!currentRoom) return; const textField = $('entry'); if (textField.value) { - currentRoom.localParticipant.sendText(textField.value, { topic: 'chat' }); + currentRoom.localParticipant.sendText(textField.value, { topic: 'lk.chat' }); textField.value = ''; } }, @@ -619,7 +618,7 @@ async function sendGreetingTo(participant: Participant) { const greeting = `Hello new participant ${participant.identity}. This is just an progressively updating chat message from me, participant ${currentRoom?.localParticipant.identity}.`; const streamWriter = await currentRoom!.localParticipant.streamText({ - topic: 'chat', + topic: 'lk.chat', destinationIdentities: [participant.identity], }); diff --git a/src/room/StreamReader.ts b/src/room/StreamReader.ts index 9a52ec68a5..f72f23faff 100644 --- a/src/room/StreamReader.ts +++ b/src/room/StreamReader.ts @@ -1,5 +1,5 @@ import type { DataStream_Chunk } from '@livekit/protocol'; -import type { BaseStreamInfo, ByteStreamInfo, TextStreamChunk, TextStreamInfo } from './types'; +import type { BaseStreamInfo, ByteStreamInfo, TextStreamInfo } from './types'; import { bigIntToNumber } from './utils'; abstract class BaseStreamReader { @@ -124,7 +124,7 @@ export class TextStreamReader extends BaseStreamReader { const decoder = new TextDecoder(); return { - next: async (): Promise> => { + next: async (): Promise> => { try { const { done, value } = await reader.read(); if (done) { @@ -134,14 +134,7 @@ export class TextStreamReader extends BaseStreamReader { return { done: false, - value: { - index: bigIntToNumber(value.chunkIndex), - current: decoder.decode(value.content), - collected: Array.from(this.receivedChunks.values()) - .sort((a, b) => bigIntToNumber(a.chunkIndex) - bigIntToNumber(b.chunkIndex)) - .map((chunk) => decoder.decode(chunk.content)) - .join(''), - }, + value: decoder.decode(value.content), }; } } catch (error) { @@ -150,7 +143,7 @@ export class TextStreamReader extends BaseStreamReader { } }, - async return(): Promise> { + async return(): Promise> { reader.releaseLock(); return { done: true, value: undefined }; }, @@ -158,11 +151,11 @@ export class TextStreamReader extends BaseStreamReader { } async readAll(): Promise { - let latestString: string = ''; - for await (const { collected } of this) { - latestString = collected; + let finalString: string = ''; + for await (const chunk of this) { + finalString += chunk; } - return latestString; + return finalString; } } diff --git a/src/room/StreamWriter.ts b/src/room/StreamWriter.ts index 7312a22487..88baaf7a2b 100644 --- a/src/room/StreamWriter.ts +++ b/src/room/StreamWriter.ts @@ -1,15 +1,15 @@ import type { BaseStreamInfo, ByteStreamInfo, TextStreamInfo } from './types'; class BaseStreamWriter { - protected writableStream: WritableStream<[T, number?]>; + protected writableStream: WritableStream; - protected defaultWriter: WritableStreamDefaultWriter<[T, number?]>; + protected defaultWriter: WritableStreamDefaultWriter; protected onClose?: () => void; readonly info: InfoType; - constructor(writableStream: WritableStream<[T, number?]>, info: InfoType, onClose?: () => void) { + constructor(writableStream: WritableStream, info: InfoType, onClose?: () => void) { this.writableStream = writableStream; this.defaultWriter = writableStream.getWriter(); this.onClose = onClose; @@ -17,7 +17,7 @@ class BaseStreamWriter { } write(chunk: T): Promise { - return this.defaultWriter.write([chunk]); + return this.defaultWriter.write(chunk); } async close() { diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index 3daad8126c..98a5dad4d0 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -90,6 +90,7 @@ import { isWeb, numberToBigInt, sleep, + splitUtf8, supportsAV1, supportsVP9, } from '../utils'; @@ -1614,20 +1615,13 @@ export default class LocalParticipant extends Participant { let chunkId = 0; const localP = this; - const writableStream = new WritableStream<[string, number?]>({ + const writableStream = new WritableStream({ // Implement the sink - write([textChunk]) { - const textInBytes = new TextEncoder().encode(textChunk); - - if (textInBytes.byteLength > STREAM_CHUNK_SIZE) { - this.abort?.(); - throw new Error('chunk size too large'); - } - - return new Promise(async (resolve) => { + async write(text) { + for (const textChunk in splitUtf8(text, STREAM_CHUNK_SIZE)) { await localP.engine.waitForBufferStatusLow(DataPacket_Kind.RELIABLE); const chunk = new DataStream_Chunk({ - content: textInBytes, + content: new TextEncoder().encode(textChunk), streamId, chunkIndex: numberToBigInt(chunkId), }); @@ -1641,8 +1635,7 @@ export default class LocalParticipant extends Participant { await localP.engine.sendDataPacket(chunkPacket, DataPacket_Kind.RELIABLE); chunkId += 1; - resolve(); - }); + } }, async close() { const trailer = new DataStream_Trailer({ diff --git a/src/room/types.ts b/src/room/types.ts index 7a4a7cd2c1..34aaf14109 100644 --- a/src/room/types.ts +++ b/src/room/types.ts @@ -119,9 +119,3 @@ export interface ByteStreamInfo extends BaseStreamInfo { } export interface TextStreamInfo extends BaseStreamInfo {} - -export type TextStreamChunk = { - index: number; - current: string; - collected: string; -}; diff --git a/src/room/utils.ts b/src/room/utils.ts index 2f0fff4701..9310265ec6 100644 --- a/src/room/utils.ts +++ b/src/room/utils.ts @@ -625,3 +625,19 @@ export function isLocalParticipant(p: Participant): p is LocalParticipant { export function isRemoteParticipant(p: Participant): p is RemoteParticipant { return !p.isLocal; } + +export function splitUtf8(s: string, n: number): string[] { + // adapted from https://stackoverflow.com/a/6043797 + const result: string[] = []; + while (s.length > n) { + let k = n; + // Move back to find the start of a UTF-8 character + while ((s.charCodeAt(k) & 0xc0) === 0x80) { + k--; + } + result.push(s.slice(0, k)); + s = s.slice(k); + } + result.push(s); + return result; +} From ee430239ed70d30d8fad7afcebd7220936d600da Mon Sep 17 00:00:00 2001 From: lukasIO Date: Sun, 16 Feb 2025 09:14:30 +0100 Subject: [PATCH 2/3] simplify --- src/room/participant/LocalParticipant.ts | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index 98a5dad4d0..8fe603f482 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -1537,19 +1537,9 @@ export default class LocalParticipant extends Participant { attachedStreamIds: fileIds, }); - const textChunkSize = Math.floor(STREAM_CHUNK_SIZE / 4); // utf8 is at most 4 bytes long, so play it safe and take a quarter of the byte size to slice the string - const totalTextChunks = Math.ceil(totalTextLength / textChunkSize); - - for (let i = 0; i < totalTextChunks; i++) { - const chunkData = text.slice( - i * textChunkSize, - Math.min((i + 1) * textChunkSize, totalTextLength), - ); - await this.engine.waitForBufferStatusLow(DataPacket_Kind.RELIABLE); - await writer.write(chunkData); - - handleProgress(Math.ceil((i + 1) / totalTextChunks), 0); - } + await writer.write(text); + // set text part of progress to 1 + handleProgress(1, 0); await writer.close(); From b804838a4ffc61cd8880f1b45c322b3c8150265c Mon Sep 17 00:00:00 2001 From: lukasIO Date: Mon, 17 Feb 2025 08:22:05 +0100 Subject: [PATCH 3/3] Create empty-buttons-brake.md --- .changeset/empty-buttons-brake.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/empty-buttons-brake.md diff --git a/.changeset/empty-buttons-brake.md b/.changeset/empty-buttons-brake.md new file mode 100644 index 0000000000..e07f7283c3 --- /dev/null +++ b/.changeset/empty-buttons-brake.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Add auto chunking to text streams