Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/empty-buttons-brake.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"livekit-client": patch
---

Add auto chunking to text streams
23 changes: 11 additions & 12 deletions examples/demo/demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -264,16 +264,15 @@ const appActions = {
room.getParticipantByIdentity(participant?.identity),
);
} else {
for await (const msg of reader) {
handleChatMessage(
{
id: info.id,
timestamp: info.timestamp,
message: msg.collected,
},
room.getParticipantByIdentity(participant?.identity),
);
}
handleChatMessage(
{
id: info.id,
timestamp: info.timestamp,
message: await reader.readAll(),
},
room.getParticipantByIdentity(participant?.identity),
);

appendLog('text stream finished');
}
console.log('final info including close extensions', reader.info);
Expand Down Expand Up @@ -515,7 +514,7 @@ const appActions = {
if (!currentRoom) return;
const textField = <HTMLInputElement>$('entry');
if (textField.value) {
currentRoom.localParticipant.sendText(textField.value, { topic: 'chat' });
currentRoom.localParticipant.sendText(textField.value, { topic: 'lk.chat' });
textField.value = '';
}
},
Expand Down Expand Up @@ -619,7 +618,7 @@ async function sendGreetingTo(participant: Participant) {
const greeting = `Hello new participant ${participant.identity}. This is just an progressively updating chat message from me, participant ${currentRoom?.localParticipant.identity}.`;

const streamWriter = await currentRoom!.localParticipant.streamText({
topic: 'chat',
topic: 'lk.chat',
destinationIdentities: [participant.identity],
});

Expand Down
23 changes: 8 additions & 15 deletions src/room/StreamReader.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import type { DataStream_Chunk } from '@livekit/protocol';
import type { BaseStreamInfo, ByteStreamInfo, TextStreamChunk, TextStreamInfo } from './types';
import type { BaseStreamInfo, ByteStreamInfo, TextStreamInfo } from './types';
import { bigIntToNumber } from './utils';

abstract class BaseStreamReader<T extends BaseStreamInfo> {
Expand Down Expand Up @@ -124,7 +124,7 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
const decoder = new TextDecoder();

return {
next: async (): Promise<IteratorResult<TextStreamChunk>> => {
next: async (): Promise<IteratorResult<string>> => {
try {
const { done, value } = await reader.read();
if (done) {
Expand All @@ -134,14 +134,7 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {

return {
done: false,
value: {
index: bigIntToNumber(value.chunkIndex),
current: decoder.decode(value.content),
collected: Array.from(this.receivedChunks.values())
.sort((a, b) => bigIntToNumber(a.chunkIndex) - bigIntToNumber(b.chunkIndex))
.map((chunk) => decoder.decode(chunk.content))
.join(''),
},
value: decoder.decode(value.content),
};
}
} catch (error) {
Expand All @@ -150,19 +143,19 @@ export class TextStreamReader extends BaseStreamReader<TextStreamInfo> {
}
},

async return(): Promise<IteratorResult<TextStreamChunk>> {
async return(): Promise<IteratorResult<string>> {
reader.releaseLock();
return { done: true, value: undefined };
},
};
}

async readAll(): Promise<string> {
let latestString: string = '';
for await (const { collected } of this) {
latestString = collected;
let finalString: string = '';
for await (const chunk of this) {
finalString += chunk;
}
return latestString;
return finalString;
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/room/StreamWriter.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import type { BaseStreamInfo, ByteStreamInfo, TextStreamInfo } from './types';

class BaseStreamWriter<T, InfoType extends BaseStreamInfo> {
protected writableStream: WritableStream<[T, number?]>;
protected writableStream: WritableStream<T>;

protected defaultWriter: WritableStreamDefaultWriter<[T, number?]>;
protected defaultWriter: WritableStreamDefaultWriter<T>;

protected onClose?: () => void;

readonly info: InfoType;

constructor(writableStream: WritableStream<[T, number?]>, info: InfoType, onClose?: () => void) {
constructor(writableStream: WritableStream<T>, info: InfoType, onClose?: () => void) {
this.writableStream = writableStream;
this.defaultWriter = writableStream.getWriter();
this.onClose = onClose;
this.info = info;
}

write(chunk: T): Promise<void> {
return this.defaultWriter.write([chunk]);
return this.defaultWriter.write(chunk);
}

async close() {
Expand Down
35 changes: 9 additions & 26 deletions src/room/participant/LocalParticipant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ import {
isWeb,
numberToBigInt,
sleep,
splitUtf8,
supportsAV1,
supportsVP9,
} from '../utils';
Expand Down Expand Up @@ -1536,19 +1537,9 @@ export default class LocalParticipant extends Participant {
attachedStreamIds: fileIds,
});

const textChunkSize = Math.floor(STREAM_CHUNK_SIZE / 4); // utf8 is at most 4 bytes long, so play it safe and take a quarter of the byte size to slice the string
const totalTextChunks = Math.ceil(totalTextLength / textChunkSize);

for (let i = 0; i < totalTextChunks; i++) {
const chunkData = text.slice(
i * textChunkSize,
Math.min((i + 1) * textChunkSize, totalTextLength),
);
await this.engine.waitForBufferStatusLow(DataPacket_Kind.RELIABLE);
await writer.write(chunkData);

handleProgress(Math.ceil((i + 1) / totalTextChunks), 0);
}
await writer.write(text);
// set text part of progress to 1
handleProgress(1, 0);

await writer.close();

Expand Down Expand Up @@ -1614,20 +1605,13 @@ export default class LocalParticipant extends Participant {
let chunkId = 0;
const localP = this;

const writableStream = new WritableStream<[string, number?]>({
const writableStream = new WritableStream<string>({
// Implement the sink
write([textChunk]) {
const textInBytes = new TextEncoder().encode(textChunk);

if (textInBytes.byteLength > STREAM_CHUNK_SIZE) {
this.abort?.();
throw new Error('chunk size too large');
}

return new Promise(async (resolve) => {
async write(text) {
for (const textChunk in splitUtf8(text, STREAM_CHUNK_SIZE)) {
await localP.engine.waitForBufferStatusLow(DataPacket_Kind.RELIABLE);
const chunk = new DataStream_Chunk({
content: textInBytes,
content: new TextEncoder().encode(textChunk),
streamId,
chunkIndex: numberToBigInt(chunkId),
});
Expand All @@ -1641,8 +1625,7 @@ export default class LocalParticipant extends Participant {
await localP.engine.sendDataPacket(chunkPacket, DataPacket_Kind.RELIABLE);

chunkId += 1;
resolve();
});
}
},
async close() {
const trailer = new DataStream_Trailer({
Expand Down
6 changes: 0 additions & 6 deletions src/room/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,3 @@ export interface ByteStreamInfo extends BaseStreamInfo {
}

export interface TextStreamInfo extends BaseStreamInfo {}

export type TextStreamChunk = {
index: number;
current: string;
collected: string;
};
16 changes: 16 additions & 0 deletions src/room/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -625,3 +625,19 @@ export function isLocalParticipant(p: Participant): p is LocalParticipant {
export function isRemoteParticipant(p: Participant): p is RemoteParticipant {
return !p.isLocal;
}

export function splitUtf8(s: string, n: number): string[] {
// adapted from https://stackoverflow.com/a/6043797
const result: string[] = [];
while (s.length > n) {
let k = n;
// Move back to find the start of a UTF-8 character
while ((s.charCodeAt(k) & 0xc0) === 0x80) {
k--;
}
result.push(s.slice(0, k));
s = s.slice(k);
}
result.push(s);
return result;
}
Loading