From e811ef09f6118c69ea337e0b1eb969a663546bde Mon Sep 17 00:00:00 2001 From: ianshade Date: Thu, 21 Mar 2024 13:55:01 +0100 Subject: [PATCH 1/2] fix(vMix): handling XML messages with multi-byte characters the data length that vMix provides is in bytes, not chars; the line remainder has to be stored as a buffer in case the arriving data is fragmented mid-character --- .../vMixResponseStreamReader.spec.ts | 61 ++++++++++++++++++- .../vmix/vMixResponseStreamReader.ts | 27 +++++--- 2 files changed, 78 insertions(+), 10 deletions(-) diff --git a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts index 9e919c271..bf499f775 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts @@ -1,6 +1,13 @@ import { VMixResponseStreamReader } from '../vMixResponseStreamReader' describe('VMixResponseStreamReader', () => { + test('the helper uses byte length of strings', () => { + // this is a meta-test for the helper used in unit tests below, to assert that the data length is in bytes (utf-8), not characters + expect(makeXmlMessage('abc')).toBe('XML 18\r\nabc\r\n') + expect(makeXmlMessage('abc¾')).toBe('XML 20\r\nabc¾\r\n') + expect(makeXmlMessage('abc🚀🚀')).toBe('XML 26\r\nabc🚀🚀\r\n') + }) + it('processes a complete message', async () => { const reader = new VMixResponseStreamReader() @@ -231,6 +238,57 @@ describe('VMixResponseStreamReader', () => { ) }) + it('processes a message with data containing multi-byte characters', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + const xmlString = + '27.0.0.49HDC:\\🚀\\preset3¾.vmix' + reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString))) + + expect(onMessage).toHaveBeenCalledTimes(1) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'XML', + response: 'OK', + body: xmlString, + }) + ) + }) + + it('processes a fragmented message with data containing multi-byte characters, split mid-character', async () => { + const reader = new VMixResponseStreamReader() + + const onMessage = jest.fn() + reader.on('response', onMessage) + + const xmlString = '🚀🚀' + const xmlMessage = `XML 23\r\n${xmlString}\r\n` + const fullBuffer = Buffer.from(xmlMessage) + + const firstPart = fullBuffer.slice(0, 16) + const secondPart = fullBuffer.slice(16) + + // sanity check that we did actually split mid-character + expect(firstPart.toString('utf-8') + secondPart.toString('utf-8')).not.toBe(xmlMessage) + + reader.processIncomingData(firstPart) + reader.processIncomingData(secondPart) + + expect(onMessage).toHaveBeenCalledTimes(1) + expect(onMessage).toHaveBeenNthCalledWith( + 1, + expect.objectContaining({ + command: 'XML', + response: 'OK', + body: xmlString, + }) + ) + }) + it('processes a multiline message with data', async () => { // note: I don't know if those can actually be encountered @@ -466,7 +524,8 @@ describe('VMixResponseStreamReader', () => { }) function makeXmlMessage(xmlString: string): string { - return `XML ${xmlString.length + 2}\r\n${xmlString}\r\n` + // the length of the data is in bytes, not characters! + return `XML ${Buffer.byteLength(xmlString, 'utf-8') + 2}\r\n${xmlString}\r\n` } function splitAtIndices(text: string, indices: number[]) { diff --git a/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts b/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts index 7657f82b0..0f81a9697 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts @@ -19,24 +19,27 @@ const RESPONSE_REGEX = /^(?\w+)\s+(?OK|ER|\d+)(\s+(? { private _unprocessedLines: string[] = [] - private _lineRemainder = '' + private _lineRemainder = Buffer.alloc(0) reset() { this._unprocessedLines = [] - this._lineRemainder = '' + this._lineRemainder = Buffer.alloc(0) } processIncomingData(data: Buffer) { - const string = this._lineRemainder + data.toString('utf-8') - this._lineRemainder = '' - const lines = string.split('\r\n') - const lastChunk = lines.pop() + const remainingData = Buffer.concat([this._lineRemainder, data]) + const stringData = remainingData.toString('utf-8') + const incomingLines = stringData.split('\r\n') + const lastChunk = incomingLines.pop() if (lastChunk != null && lastChunk !== '') { // Incomplete line found at the end - keep it - this._lineRemainder = lastChunk + const linesByteLength = this.calculatePreSplitByteLength(incomingLines) + this._lineRemainder = remainingData.slice(linesByteLength) + } else { + this._lineRemainder = Buffer.alloc(0) } - this._unprocessedLines.push(...lines) + this._unprocessedLines.push(...incomingLines) let lineToProcess: string | undefined @@ -81,6 +84,12 @@ export class VMixResponseStreamReader extends EventEmitter acc + Buffer.byteLength(str, 'utf-8'), 0) + const additionalBytes = arrayOfStrings.length * 2 + return totalByteLength + additionalBytes + } + private processPayloadData(responseLen: number): string | null { const processedLines: string[] = [] @@ -92,7 +101,7 @@ export class VMixResponseStreamReader extends EventEmitter Date: Fri, 22 Mar 2024 14:46:19 +0100 Subject: [PATCH 2/2] refactor(vMix): rely on stream encoding for chunked characters support After learning what setEncoding does, parts of the previous commit are reverted and the rest of the code is updated to make use of the fact that the 'data' event supplies a string --- .../src/__mocks__/net.ts | 2 +- .../vMixResponseStreamReader.spec.ts | 102 +++++++----------- .../integrations/vmix/__tests__/vmixMock.ts | 3 +- .../src/integrations/vmix/connection.ts | 6 ++ .../vmix/vMixResponseStreamReader.ts | 22 ++-- 5 files changed, 51 insertions(+), 84 deletions(-) diff --git a/packages/timeline-state-resolver/src/__mocks__/net.ts b/packages/timeline-state-resolver/src/__mocks__/net.ts index cd31e5f89..00f94f8ea 100644 --- a/packages/timeline-state-resolver/src/__mocks__/net.ts +++ b/packages/timeline-state-resolver/src/__mocks__/net.ts @@ -67,7 +67,7 @@ export class Socket extends EventEmitter { public mockClose() { this.setClosed() } - public mockData(data: Buffer) { + public mockData(data: string) { this.emit('data', data) } diff --git a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts index bf499f775..a071ae971 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vMixResponseStreamReader.spec.ts @@ -14,7 +14,7 @@ describe('VMixResponseStreamReader', () => { const onMessage = jest.fn() reader.on('response', onMessage) - reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) + reader.processIncomingData('VERSION OK 27.0.0.49\r\n') expect(onMessage).toHaveBeenCalledTimes(1) expect(onMessage).toHaveBeenCalledWith( @@ -31,8 +31,8 @@ describe('VMixResponseStreamReader', () => { const onMessage = jest.fn() reader.on('response', onMessage) - reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) - reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n')) + reader.processIncomingData('VERSION OK 27.0.0.49\r\n') + reader.processIncomingData('FUNCTION OK Take\r\n') expect(onMessage).toHaveBeenCalledTimes(2) expect(onMessage).toHaveBeenNthCalledWith( @@ -58,8 +58,8 @@ describe('VMixResponseStreamReader', () => { const onMessage = jest.fn() reader.on('response', onMessage) - reader.processIncomingData(Buffer.from('VERSION O')) - reader.processIncomingData(Buffer.from('K 27.0.0.49\r\n')) + reader.processIncomingData('VERSION O') + reader.processIncomingData('K 27.0.0.49\r\n') expect(onMessage).toHaveBeenCalledTimes(1) expect(onMessage).toHaveBeenCalledWith( @@ -76,9 +76,9 @@ describe('VMixResponseStreamReader', () => { const onMessage = jest.fn() reader.on('response', onMessage) - reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) - reader.processIncomingData(Buffer.from('FUNCTION')) - reader.processIncomingData(Buffer.from(' ER Error message\r\n')) + reader.processIncomingData('VERSION OK 27.0.0.49\r\n') + reader.processIncomingData('FUNCTION') + reader.processIncomingData(' ER Error message\r\n') expect(onMessage).toHaveBeenCalledTimes(2) expect(onMessage).toHaveBeenNthCalledWith( @@ -104,11 +104,11 @@ describe('VMixResponseStreamReader', () => { const onMessage = jest.fn() reader.on('response', onMessage) - reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49')) - reader.processIncomingData(Buffer.from('\r\n')) - reader.processIncomingData(Buffer.from('FUNCTION')) - reader.processIncomingData(Buffer.from(' ER Error message\r')) - reader.processIncomingData(Buffer.from('\n')) + reader.processIncomingData('VERSION OK 27.0.0.49') + reader.processIncomingData('\r\n') + reader.processIncomingData('FUNCTION') + reader.processIncomingData(' ER Error message\r') + reader.processIncomingData('\n') expect(onMessage).toHaveBeenCalledTimes(2) expect(onMessage).toHaveBeenNthCalledWith( @@ -134,7 +134,7 @@ describe('VMixResponseStreamReader', () => { const onMessage = jest.fn() reader.on('response', onMessage) - reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION ER Error message\r\n')) + reader.processIncomingData('VERSION OK 27.0.0.49\r\nFUNCTION ER Error message\r\n') expect(onMessage).toHaveBeenCalledTimes(2) expect(onMessage).toHaveBeenNthCalledWith( @@ -160,8 +160,8 @@ describe('VMixResponseStreamReader', () => { const onMessage = jest.fn() reader.on('response', onMessage) - reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION E')) - reader.processIncomingData(Buffer.from('R Error message\r\n')) + reader.processIncomingData('VERSION OK 27.0.0.49\r\nFUNCTION E') + reader.processIncomingData('R Error message\r\n') expect(onMessage).toHaveBeenCalledTimes(2) expect(onMessage).toHaveBeenNthCalledWith( @@ -187,9 +187,9 @@ describe('VMixResponseStreamReader', () => { const onMessage = jest.fn() reader.on('response', onMessage) - reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION E')) - reader.processIncomingData(Buffer.from('R Error message\r\nFUNCTION OK T')) - reader.processIncomingData(Buffer.from('ake\r\n')) + reader.processIncomingData('VERSION OK 27.0.0.49\r\nFUNCTION E') + reader.processIncomingData('R Error message\r\nFUNCTION OK T') + reader.processIncomingData('ake\r\n') expect(onMessage).toHaveBeenCalledTimes(3) expect(onMessage).toHaveBeenNthCalledWith( @@ -225,7 +225,7 @@ describe('VMixResponseStreamReader', () => { const xmlString = '27.0.0.49HDC:\\preset.vmix' - reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString))) + reader.processIncomingData(makeXmlMessage(xmlString)) expect(onMessage).toHaveBeenCalledTimes(1) expect(onMessage).toHaveBeenNthCalledWith( @@ -246,37 +246,7 @@ describe('VMixResponseStreamReader', () => { const xmlString = '27.0.0.49HDC:\\🚀\\preset3¾.vmix' - reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString))) - - expect(onMessage).toHaveBeenCalledTimes(1) - expect(onMessage).toHaveBeenNthCalledWith( - 1, - expect.objectContaining({ - command: 'XML', - response: 'OK', - body: xmlString, - }) - ) - }) - - it('processes a fragmented message with data containing multi-byte characters, split mid-character', async () => { - const reader = new VMixResponseStreamReader() - - const onMessage = jest.fn() - reader.on('response', onMessage) - - const xmlString = '🚀🚀' - const xmlMessage = `XML 23\r\n${xmlString}\r\n` - const fullBuffer = Buffer.from(xmlMessage) - - const firstPart = fullBuffer.slice(0, 16) - const secondPart = fullBuffer.slice(16) - - // sanity check that we did actually split mid-character - expect(firstPart.toString('utf-8') + secondPart.toString('utf-8')).not.toBe(xmlMessage) - - reader.processIncomingData(firstPart) - reader.processIncomingData(secondPart) + reader.processIncomingData(makeXmlMessage(xmlString)) expect(onMessage).toHaveBeenCalledTimes(1) expect(onMessage).toHaveBeenNthCalledWith( @@ -299,7 +269,7 @@ describe('VMixResponseStreamReader', () => { const xmlString = '\r\n27.0.0.49\r\nHD\r\nC:\\preset.vmix\r\n\r\n' - reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString))) + reader.processIncomingData(makeXmlMessage(xmlString)) expect(onMessage).toHaveBeenCalledTimes(1) expect(onMessage).toHaveBeenNthCalledWith( @@ -323,7 +293,7 @@ describe('VMixResponseStreamReader', () => { const xmlMessage = makeXmlMessage(xmlString) splitAtIndices(xmlMessage, [2, 10, 25, 40]).forEach((fragment) => { expect(fragment.length).toBeGreaterThan(0) - reader.processIncomingData(Buffer.from(fragment)) + reader.processIncomingData(fragment) }) expect(onMessage).toHaveBeenCalledTimes(1) @@ -347,7 +317,7 @@ describe('VMixResponseStreamReader', () => { '27.0.0.49HDC:\\preset.vmix' const xmlString2 = '25.0.0.14K' - reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString) + makeXmlMessage(xmlString2))) + reader.processIncomingData(makeXmlMessage(xmlString) + makeXmlMessage(xmlString2)) expect(onMessage).toHaveBeenCalledTimes(2) expect(onMessage).toHaveBeenNthCalledWith( @@ -378,8 +348,8 @@ describe('VMixResponseStreamReader', () => { '27.0.0.49HDC:\\preset.vmix' const xmlString2 = '25.0.0.14K' - reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString))) - reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString2))) + reader.processIncomingData(makeXmlMessage(xmlString)) + reader.processIncomingData(makeXmlMessage(xmlString2)) expect(onMessage).toHaveBeenCalledTimes(2) expect(onMessage).toHaveBeenNthCalledWith( @@ -410,9 +380,9 @@ describe('VMixResponseStreamReader', () => { '27.0.0.49HDC:\\preset.vmix' const xmlString2 = '25.0.0.14K' - reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString).substring(0, 44))) + reader.processIncomingData(makeXmlMessage(xmlString).substring(0, 44)) reader.reset() - reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString2))) + reader.processIncomingData(makeXmlMessage(xmlString2)) expect(onMessage).toHaveBeenCalledTimes(1) expect(onMessage).toHaveBeenNthCalledWith( @@ -435,8 +405,8 @@ describe('VMixResponseStreamReader', () => { const onError = jest.fn() reader.on('error', onError) - reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) - reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n')) + reader.processIncomingData('VERSION OK 27.0.0.49\r\n') + reader.processIncomingData('FUNCTION OK Take\r\n') expect(onMessage).toHaveBeenCalledTimes(2) expect(onMessage).toHaveBeenNthCalledWith( @@ -466,9 +436,9 @@ describe('VMixResponseStreamReader', () => { const onError = jest.fn() reader.on('error', onError) - reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) - reader.processIncomingData(Buffer.from('\r\n')) - reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n')) + reader.processIncomingData('VERSION OK 27.0.0.49\r\n') + reader.processIncomingData('\r\n') + reader.processIncomingData('FUNCTION OK Take\r\n') expect(onMessage).toHaveBeenCalledTimes(2) expect(onMessage).toHaveBeenNthCalledWith( @@ -498,9 +468,9 @@ describe('VMixResponseStreamReader', () => { const onError = jest.fn() reader.on('error', onError) - reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n')) - reader.processIncomingData(Buffer.from('WASSUP\r\n')) - reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n')) + reader.processIncomingData('VERSION OK 27.0.0.49\r\n') + reader.processIncomingData('WASSUP\r\n') + reader.processIncomingData('FUNCTION OK Take\r\n') expect(onMessage).toHaveBeenCalledTimes(2) expect(onMessage).toHaveBeenNthCalledWith( diff --git a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts index f298246f6..1779c685b 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/__tests__/vmixMock.ts @@ -157,9 +157,8 @@ function buildResponse(command: string, state?: 'OK' | 'ER', dataOrMessage?: str // send every item in the array in a separate `data` event/packet function sendData(socket: net.Socket, response: string[]) { for (const packet of response) { - const dataBuf = Buffer.from(packet, 'utf-8') orgSetImmediate(() => { - socket.mockData(dataBuf) + socket.mockData(packet) }) } } diff --git a/packages/timeline-state-resolver/src/integrations/vmix/connection.ts b/packages/timeline-state-resolver/src/integrations/vmix/connection.ts index 4efae0d50..aaa3e8fbf 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/connection.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/connection.ts @@ -114,6 +114,12 @@ export class BaseConnection extends EventEmitter { this._socket.setEncoding('utf-8') this._socket.on('data', (data) => { + if (typeof data !== 'string') { + // this is against the types, but according to the docs the data will be a string + // the problem of a character split into chunks in transit should be taken care of + // (https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_readable_setencoding_encoding) + throw new Error('Received a non-string even though encoding should have been set to utf-8') + } this._responseStreamReader.processIncomingData(data) }) this._socket.on('connect', () => { diff --git a/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts b/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts index 0f81a9697..0f3fbfbaa 100644 --- a/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts +++ b/packages/timeline-state-resolver/src/integrations/vmix/vMixResponseStreamReader.ts @@ -19,25 +19,23 @@ const RESPONSE_REGEX = /^(?\w+)\s+(?OK|ER|\d+)(\s+(? { private _unprocessedLines: string[] = [] - private _lineRemainder = Buffer.alloc(0) + private _lineRemainder = '' reset() { this._unprocessedLines = [] - this._lineRemainder = Buffer.alloc(0) + this._lineRemainder = '' } - processIncomingData(data: Buffer) { - const remainingData = Buffer.concat([this._lineRemainder, data]) - const stringData = remainingData.toString('utf-8') - const incomingLines = stringData.split('\r\n') + processIncomingData(data: string) { + const remainingData = this._lineRemainder + data + const incomingLines = remainingData.split('\r\n') const lastChunk = incomingLines.pop() if (lastChunk != null && lastChunk !== '') { // Incomplete line found at the end - keep it - const linesByteLength = this.calculatePreSplitByteLength(incomingLines) - this._lineRemainder = remainingData.slice(linesByteLength) + this._lineRemainder = lastChunk } else { - this._lineRemainder = Buffer.alloc(0) + this._lineRemainder = '' } this._unprocessedLines.push(...incomingLines) @@ -84,12 +82,6 @@ export class VMixResponseStreamReader extends EventEmitter acc + Buffer.byteLength(str, 'utf-8'), 0) - const additionalBytes = arrayOfStrings.length * 2 - return totalByteLength + additionalBytes - } - private processPayloadData(responseLen: number): string | null { const processedLines: string[] = []