Skip to content

Commit

Permalink
Merge pull request #325 from tv2norge-collab/fix/EAV-209/vMix-data-le…
Browse files Browse the repository at this point in the history
…ngth

fix(vMix): handling XML messages with multi-byte characters
  • Loading branch information
jstarpl authored Mar 25, 2024
2 parents b8d20fa + bf4df13 commit 9b13e3a
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 46 deletions.
2 changes: 1 addition & 1 deletion packages/timeline-state-resolver/src/__mocks__/net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class Socket extends EventEmitter {
public mockClose() {
this.setClosed()
}
public mockData(data: Buffer) {
public mockData(data: string) {
this.emit('data', data)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
import { VMixResponseStreamReader } from '../vMixResponseStreamReader'

describe('VMixResponseStreamReader', () => {
test('the helper uses byte length of strings', () => {
// this is a meta-test for the helper used in unit tests below, to assert that the data length is in bytes (utf-8), not characters
expect(makeXmlMessage('<vmix>abc</vmix>')).toBe('XML 18\r\n<vmix>abc</vmix>\r\n')
expect(makeXmlMessage('<vmix>abc¾</vmix>')).toBe('XML 20\r\n<vmix>abc¾</vmix>\r\n')
expect(makeXmlMessage('<vmix>abc🚀🚀</vmix>')).toBe('XML 26\r\n<vmix>abc🚀🚀</vmix>\r\n')
})

it('processes a complete message', async () => {
const reader = new VMixResponseStreamReader()

const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenCalledWith(
Expand All @@ -24,8 +31,8 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')
reader.processIncomingData('FUNCTION OK Take\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -51,8 +58,8 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION O'))
reader.processIncomingData(Buffer.from('K 27.0.0.49\r\n'))
reader.processIncomingData('VERSION O')
reader.processIncomingData('K 27.0.0.49\r\n')

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenCalledWith(
Expand All @@ -69,9 +76,9 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION'))
reader.processIncomingData(Buffer.from(' ER Error message\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')
reader.processIncomingData('FUNCTION')
reader.processIncomingData(' ER Error message\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -97,11 +104,11 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49'))
reader.processIncomingData(Buffer.from('\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION'))
reader.processIncomingData(Buffer.from(' ER Error message\r'))
reader.processIncomingData(Buffer.from('\n'))
reader.processIncomingData('VERSION OK 27.0.0.49')
reader.processIncomingData('\r\n')
reader.processIncomingData('FUNCTION')
reader.processIncomingData(' ER Error message\r')
reader.processIncomingData('\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -127,7 +134,7 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION ER Error message\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\nFUNCTION ER Error message\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -153,8 +160,8 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION E'))
reader.processIncomingData(Buffer.from('R Error message\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\nFUNCTION E')
reader.processIncomingData('R Error message\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -180,9 +187,9 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION E'))
reader.processIncomingData(Buffer.from('R Error message\r\nFUNCTION OK T'))
reader.processIncomingData(Buffer.from('ake\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\nFUNCTION E')
reader.processIncomingData('R Error message\r\nFUNCTION OK T')
reader.processIncomingData('ake\r\n')

expect(onMessage).toHaveBeenCalledTimes(3)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down Expand Up @@ -218,7 +225,28 @@ describe('VMixResponseStreamReader', () => {

const xmlString =
'<vmix><version>27.0.0.49</version><edition>HD</edition><preset>C:\\preset.vmix</preset><inputs><inputs></vmix>'
reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString)))
reader.processIncomingData(makeXmlMessage(xmlString))

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
command: 'XML',
response: 'OK',
body: xmlString,
})
)
})

it('processes a message with data containing multi-byte characters', async () => {
const reader = new VMixResponseStreamReader()

const onMessage = jest.fn()
reader.on('response', onMessage)

const xmlString =
'<vmix><version>27.0.0.49</version><edition>HD</edition><preset>C:\\🚀\\preset3¾.vmix</preset><inputs><inputs></vmix>'
reader.processIncomingData(makeXmlMessage(xmlString))

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -241,7 +269,7 @@ describe('VMixResponseStreamReader', () => {

const xmlString =
'<vmix>\r\n<version>27.0.0.49</version>\r\n<edition>HD</edition>\r\n<preset>C:\\preset.vmix</preset>\r\n<inputs><inputs>\r\n</vmix>'
reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString)))
reader.processIncomingData(makeXmlMessage(xmlString))

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -265,7 +293,7 @@ describe('VMixResponseStreamReader', () => {
const xmlMessage = makeXmlMessage(xmlString)
splitAtIndices(xmlMessage, [2, 10, 25, 40]).forEach((fragment) => {
expect(fragment.length).toBeGreaterThan(0)
reader.processIncomingData(Buffer.from(fragment))
reader.processIncomingData(fragment)
})

expect(onMessage).toHaveBeenCalledTimes(1)
Expand All @@ -289,7 +317,7 @@ describe('VMixResponseStreamReader', () => {
'<vmix><version>27.0.0.49</version><edition>HD</edition><preset>C:\\preset.vmix</preset><inputs><inputs></vmix>'
const xmlString2 = '<vmix><version>25.0.0.1</version><edition>4K</edition><inputs><inputs></vmix>'

reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString) + makeXmlMessage(xmlString2)))
reader.processIncomingData(makeXmlMessage(xmlString) + makeXmlMessage(xmlString2))

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down Expand Up @@ -320,8 +348,8 @@ describe('VMixResponseStreamReader', () => {
'<vmix><version>27.0.0.49</version><edition>HD</edition><preset>C:\\preset.vmix</preset><inputs><inputs></vmix>'
const xmlString2 = '<vmix><version>25.0.0.1</version><edition>4K</edition><inputs><inputs></vmix>'

reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString)))
reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString2)))
reader.processIncomingData(makeXmlMessage(xmlString))
reader.processIncomingData(makeXmlMessage(xmlString2))

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down Expand Up @@ -352,9 +380,9 @@ describe('VMixResponseStreamReader', () => {
'<vmix><version>27.0.0.49</version><edition>HD</edition><preset>C:\\preset.vmix</preset><inputs><inputs></vmix>'
const xmlString2 = '<vmix><version>25.0.0.1</version><edition>4K</edition><inputs><inputs></vmix>'

reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString).substring(0, 44)))
reader.processIncomingData(makeXmlMessage(xmlString).substring(0, 44))
reader.reset()
reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString2)))
reader.processIncomingData(makeXmlMessage(xmlString2))

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -377,8 +405,8 @@ describe('VMixResponseStreamReader', () => {
const onError = jest.fn()
reader.on('error', onError)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')
reader.processIncomingData('FUNCTION OK Take\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down Expand Up @@ -408,9 +436,9 @@ describe('VMixResponseStreamReader', () => {
const onError = jest.fn()
reader.on('error', onError)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData(Buffer.from('\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')
reader.processIncomingData('\r\n')
reader.processIncomingData('FUNCTION OK Take\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down Expand Up @@ -440,9 +468,9 @@ describe('VMixResponseStreamReader', () => {
const onError = jest.fn()
reader.on('error', onError)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData(Buffer.from('WASSUP\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')
reader.processIncomingData('WASSUP\r\n')
reader.processIncomingData('FUNCTION OK Take\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -466,7 +494,8 @@ describe('VMixResponseStreamReader', () => {
})

function makeXmlMessage(xmlString: string): string {
return `XML ${xmlString.length + 2}\r\n${xmlString}\r\n`
// the length of the data is in bytes, not characters!
return `XML ${Buffer.byteLength(xmlString, 'utf-8') + 2}\r\n${xmlString}\r\n`
}

function splitAtIndices(text: string, indices: number[]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ function buildResponse(command: string, state?: 'OK' | 'ER', dataOrMessage?: str
// send every item in the array in a separate `data` event/packet
function sendData(socket: net.Socket, response: string[]) {
for (const packet of response) {
const dataBuf = Buffer.from(packet, 'utf-8')
orgSetImmediate(() => {
socket.mockData(dataBuf)
socket.mockData(packet)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ export class BaseConnection extends EventEmitter<ConnectionEvents> {
this._socket.setEncoding('utf-8')

this._socket.on('data', (data) => {
if (typeof data !== 'string') {
// this is against the types, but according to the docs the data will be a string
// the problem of a character split into chunks in transit should be taken care of
// (https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_readable_setencoding_encoding)
throw new Error('Received a non-string even though encoding should have been set to utf-8')
}
this._responseStreamReader.processIncomingData(data)
})
this._socket.on('connect', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,18 @@ export class VMixResponseStreamReader extends EventEmitter<ResponseStreamReaderE
this._lineRemainder = ''
}

processIncomingData(data: Buffer) {
const string = this._lineRemainder + data.toString('utf-8')
this._lineRemainder = ''
const lines = string.split('\r\n')
const lastChunk = lines.pop()
processIncomingData(data: string) {
const remainingData = this._lineRemainder + data
const incomingLines = remainingData.split('\r\n')
const lastChunk = incomingLines.pop()

if (lastChunk != null && lastChunk !== '') {
// Incomplete line found at the end - keep it
this._lineRemainder = lastChunk
} else {
this._lineRemainder = ''
}
this._unprocessedLines.push(...lines)
this._unprocessedLines.push(...incomingLines)

let lineToProcess: string | undefined

Expand Down Expand Up @@ -92,7 +93,7 @@ export class VMixResponseStreamReader extends EventEmitter<ResponseStreamReaderE
}

processedLines.push(line)
responseLen -= line.length + 2
responseLen -= Buffer.byteLength(line, 'utf-8') + 2
}
this._unprocessedLines.splice(0, processedLines.length)
return processedLines.join('\r\n')
Expand Down

0 comments on commit 9b13e3a

Please sign in to comment.