Skip to content

Commit

Permalink
refactor(vMix): rely on stream encoding for chunked characters support
Browse files Browse the repository at this point in the history
After learning what setEncoding does, parts of the previous commit are reverted and the rest of the code is updated to make use of the fact that the 'data' event supplies a string
  • Loading branch information
ianshade committed Mar 22, 2024
1 parent e811ef0 commit bf4df13
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 84 deletions.
2 changes: 1 addition & 1 deletion packages/timeline-state-resolver/src/__mocks__/net.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export class Socket extends EventEmitter {
public mockClose() {
this.setClosed()
}
public mockData(data: Buffer) {
public mockData(data: string) {
this.emit('data', data)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenCalledWith(
Expand All @@ -31,8 +31,8 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')
reader.processIncomingData('FUNCTION OK Take\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -58,8 +58,8 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION O'))
reader.processIncomingData(Buffer.from('K 27.0.0.49\r\n'))
reader.processIncomingData('VERSION O')
reader.processIncomingData('K 27.0.0.49\r\n')

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenCalledWith(
Expand All @@ -76,9 +76,9 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION'))
reader.processIncomingData(Buffer.from(' ER Error message\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')
reader.processIncomingData('FUNCTION')
reader.processIncomingData(' ER Error message\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -104,11 +104,11 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49'))
reader.processIncomingData(Buffer.from('\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION'))
reader.processIncomingData(Buffer.from(' ER Error message\r'))
reader.processIncomingData(Buffer.from('\n'))
reader.processIncomingData('VERSION OK 27.0.0.49')
reader.processIncomingData('\r\n')
reader.processIncomingData('FUNCTION')
reader.processIncomingData(' ER Error message\r')
reader.processIncomingData('\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -134,7 +134,7 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION ER Error message\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\nFUNCTION ER Error message\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -160,8 +160,8 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION E'))
reader.processIncomingData(Buffer.from('R Error message\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\nFUNCTION E')
reader.processIncomingData('R Error message\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -187,9 +187,9 @@ describe('VMixResponseStreamReader', () => {
const onMessage = jest.fn()
reader.on('response', onMessage)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\nFUNCTION E'))
reader.processIncomingData(Buffer.from('R Error message\r\nFUNCTION OK T'))
reader.processIncomingData(Buffer.from('ake\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\nFUNCTION E')
reader.processIncomingData('R Error message\r\nFUNCTION OK T')
reader.processIncomingData('ake\r\n')

expect(onMessage).toHaveBeenCalledTimes(3)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down Expand Up @@ -225,7 +225,7 @@ describe('VMixResponseStreamReader', () => {

const xmlString =
'<vmix><version>27.0.0.49</version><edition>HD</edition><preset>C:\\preset.vmix</preset><inputs><inputs></vmix>'
reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString)))
reader.processIncomingData(makeXmlMessage(xmlString))

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -246,37 +246,7 @@ describe('VMixResponseStreamReader', () => {

const xmlString =
'<vmix><version>27.0.0.49</version><edition>HD</edition><preset>C:\\🚀\\preset3¾.vmix</preset><inputs><inputs></vmix>'
reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString)))

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenNthCalledWith(
1,
expect.objectContaining({
command: 'XML',
response: 'OK',
body: xmlString,
})
)
})

it('processes a fragmented message with data containing multi-byte characters, split mid-character', async () => {
const reader = new VMixResponseStreamReader()

const onMessage = jest.fn()
reader.on('response', onMessage)

const xmlString = '<vmix>🚀🚀</vmix>'
const xmlMessage = `XML 23\r\n${xmlString}\r\n`
const fullBuffer = Buffer.from(xmlMessage)

const firstPart = fullBuffer.slice(0, 16)
const secondPart = fullBuffer.slice(16)

// sanity check that we did actually split mid-character
expect(firstPart.toString('utf-8') + secondPart.toString('utf-8')).not.toBe(xmlMessage)

reader.processIncomingData(firstPart)
reader.processIncomingData(secondPart)
reader.processIncomingData(makeXmlMessage(xmlString))

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -299,7 +269,7 @@ describe('VMixResponseStreamReader', () => {

const xmlString =
'<vmix>\r\n<version>27.0.0.49</version>\r\n<edition>HD</edition>\r\n<preset>C:\\preset.vmix</preset>\r\n<inputs><inputs>\r\n</vmix>'
reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString)))
reader.processIncomingData(makeXmlMessage(xmlString))

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -323,7 +293,7 @@ describe('VMixResponseStreamReader', () => {
const xmlMessage = makeXmlMessage(xmlString)
splitAtIndices(xmlMessage, [2, 10, 25, 40]).forEach((fragment) => {
expect(fragment.length).toBeGreaterThan(0)
reader.processIncomingData(Buffer.from(fragment))
reader.processIncomingData(fragment)
})

expect(onMessage).toHaveBeenCalledTimes(1)
Expand All @@ -347,7 +317,7 @@ describe('VMixResponseStreamReader', () => {
'<vmix><version>27.0.0.49</version><edition>HD</edition><preset>C:\\preset.vmix</preset><inputs><inputs></vmix>'
const xmlString2 = '<vmix><version>25.0.0.1</version><edition>4K</edition><inputs><inputs></vmix>'

reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString) + makeXmlMessage(xmlString2)))
reader.processIncomingData(makeXmlMessage(xmlString) + makeXmlMessage(xmlString2))

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down Expand Up @@ -378,8 +348,8 @@ describe('VMixResponseStreamReader', () => {
'<vmix><version>27.0.0.49</version><edition>HD</edition><preset>C:\\preset.vmix</preset><inputs><inputs></vmix>'
const xmlString2 = '<vmix><version>25.0.0.1</version><edition>4K</edition><inputs><inputs></vmix>'

reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString)))
reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString2)))
reader.processIncomingData(makeXmlMessage(xmlString))
reader.processIncomingData(makeXmlMessage(xmlString2))

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down Expand Up @@ -410,9 +380,9 @@ describe('VMixResponseStreamReader', () => {
'<vmix><version>27.0.0.49</version><edition>HD</edition><preset>C:\\preset.vmix</preset><inputs><inputs></vmix>'
const xmlString2 = '<vmix><version>25.0.0.1</version><edition>4K</edition><inputs><inputs></vmix>'

reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString).substring(0, 44)))
reader.processIncomingData(makeXmlMessage(xmlString).substring(0, 44))
reader.reset()
reader.processIncomingData(Buffer.from(makeXmlMessage(xmlString2)))
reader.processIncomingData(makeXmlMessage(xmlString2))

expect(onMessage).toHaveBeenCalledTimes(1)
expect(onMessage).toHaveBeenNthCalledWith(
Expand All @@ -435,8 +405,8 @@ describe('VMixResponseStreamReader', () => {
const onError = jest.fn()
reader.on('error', onError)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')
reader.processIncomingData('FUNCTION OK Take\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down Expand Up @@ -466,9 +436,9 @@ describe('VMixResponseStreamReader', () => {
const onError = jest.fn()
reader.on('error', onError)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData(Buffer.from('\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')
reader.processIncomingData('\r\n')
reader.processIncomingData('FUNCTION OK Take\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down Expand Up @@ -498,9 +468,9 @@ describe('VMixResponseStreamReader', () => {
const onError = jest.fn()
reader.on('error', onError)

reader.processIncomingData(Buffer.from('VERSION OK 27.0.0.49\r\n'))
reader.processIncomingData(Buffer.from('WASSUP\r\n'))
reader.processIncomingData(Buffer.from('FUNCTION OK Take\r\n'))
reader.processIncomingData('VERSION OK 27.0.0.49\r\n')
reader.processIncomingData('WASSUP\r\n')
reader.processIncomingData('FUNCTION OK Take\r\n')

expect(onMessage).toHaveBeenCalledTimes(2)
expect(onMessage).toHaveBeenNthCalledWith(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,9 +157,8 @@ function buildResponse(command: string, state?: 'OK' | 'ER', dataOrMessage?: str
// send every item in the array in a separate `data` event/packet
function sendData(socket: net.Socket, response: string[]) {
for (const packet of response) {
const dataBuf = Buffer.from(packet, 'utf-8')
orgSetImmediate(() => {
socket.mockData(dataBuf)
socket.mockData(packet)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,12 @@ export class BaseConnection extends EventEmitter<ConnectionEvents> {
this._socket.setEncoding('utf-8')

this._socket.on('data', (data) => {
if (typeof data !== 'string') {
// this is against the types, but according to the docs the data will be a string
// the problem of a character split into chunks in transit should be taken care of
// (https://nodejs.org/docs/latest-v12.x/api/stream.html#stream_readable_setencoding_encoding)
throw new Error('Received a non-string even though encoding should have been set to utf-8')
}
this._responseStreamReader.processIncomingData(data)
})
this._socket.on('connect', () => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,23 @@ const RESPONSE_REGEX = /^(?<command>\w+)\s+(?<response>OK|ER|\d+)(\s+(?<response
*/
export class VMixResponseStreamReader extends EventEmitter<ResponseStreamReaderEvents> {
private _unprocessedLines: string[] = []
private _lineRemainder = Buffer.alloc(0)
private _lineRemainder = ''

reset() {
this._unprocessedLines = []
this._lineRemainder = Buffer.alloc(0)
this._lineRemainder = ''
}

processIncomingData(data: Buffer) {
const remainingData = Buffer.concat([this._lineRemainder, data])
const stringData = remainingData.toString('utf-8')
const incomingLines = stringData.split('\r\n')
processIncomingData(data: string) {
const remainingData = this._lineRemainder + data
const incomingLines = remainingData.split('\r\n')
const lastChunk = incomingLines.pop()

if (lastChunk != null && lastChunk !== '') {
// Incomplete line found at the end - keep it
const linesByteLength = this.calculatePreSplitByteLength(incomingLines)
this._lineRemainder = remainingData.slice(linesByteLength)
this._lineRemainder = lastChunk
} else {
this._lineRemainder = Buffer.alloc(0)
this._lineRemainder = ''
}
this._unprocessedLines.push(...incomingLines)

Expand Down Expand Up @@ -84,12 +82,6 @@ export class VMixResponseStreamReader extends EventEmitter<ResponseStreamReaderE
}
}

private calculatePreSplitByteLength(arrayOfStrings: string[]) {
const totalByteLength = arrayOfStrings.reduce((acc, str) => acc + Buffer.byteLength(str, 'utf-8'), 0)
const additionalBytes = arrayOfStrings.length * 2
return totalByteLength + additionalBytes
}

private processPayloadData(responseLen: number): string | null {
const processedLines: string[] = []

Expand Down

0 comments on commit bf4df13

Please sign in to comment.