Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/openmedia hotstandby heartbeat logic #105

Merged
merged 12 commits into from
Dec 5, 2024
Merged
19 changes: 18 additions & 1 deletion packages/connector/src/MosConnection.ts
Original file line number Diff line number Diff line change
@@ -92,7 +92,7 @@ export class MosConnection extends EventEmitter<MosConnectionEvents> implements
this._debug,
this.mosTypes.strict
)
let secondary = null
let secondary: NCSServerConnection | null = null
this._ncsConnections[connectionOptions.primary.host] = primary

primary.on('rawMessage', (type: string, message: string) => {
@@ -174,6 +174,23 @@ export class MosConnection extends EventEmitter<MosConnectionEvents> implements
false
)
}
// Handle that .openMediaHotStandby should not check for heartbeats on
// the secondary connection when the primary is connected
// And disable heartbeats on the primary when the primary is disconnected
if (connectionOptions.secondary?.openMediaHotStandby) {
// Initially disable heartbeats on secondary since primary should be attempted first
secondary.disableHeartbeats()

primary.on('connectionChanged', () => {
if (primary.connected) {
secondary?.disableHeartbeats()
primary.enableHeartbeats()
} else {
secondary?.enableHeartbeats()
primary.disableHeartbeats()
}
})
}
}

return this._registerMosDevice(
6 changes: 6 additions & 0 deletions packages/connector/src/__mocks__/socket.ts
Original file line number Diff line number Diff line change
@@ -51,6 +51,7 @@ export class SocketMock extends EventEmitter implements Socket {

private _responses: Array<ReplyTypes> = []
private _autoReplyToHeartBeat = true
public mockConnectCount = 0

constructor() {
super()
@@ -102,6 +103,7 @@ export class SocketMock extends EventEmitter implements Socket {
}
// @ts-expect-error mock
connect(port: number, host: string): this {
this.mockConnectCount++
this.connectedPort = port
this.connectedHost = host

@@ -197,6 +199,10 @@ export class SocketMock extends EventEmitter implements Socket {

this.emit('connect')
}
mockEmitClose(): void {
this.emit('close')
}

mockSentMessage0(data: unknown, encoding: string): void {
if (this._autoReplyToHeartBeat) {
const str: string = typeof data === 'string' ? data : this.decode(data as any)
163 changes: 162 additions & 1 deletion packages/connector/src/__tests__/MosConnection.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { clearMocks, decode, delay, encode, getMessageId, getXMLReply, initMosConnection, setupMocks } from './lib'
import {
clearMocks,
decode,
delay,
encode,
getConnectionsFromDevice,
getMessageId,
getXMLReply,
initMosConnection,
setupMocks,
} from './lib'
import { SocketMock } from '../__mocks__/socket'
import { ServerMock } from '../__mocks__/server'
import { xmlData, xmlApiData } from '../__mocks__/testData'
@@ -532,6 +542,157 @@ describe('MosDevice: General', () => {
expect(onError).toHaveBeenCalledTimes(0)
expect(onWarning).toHaveBeenCalledTimes(0)

await mos.dispose()
})
test('Hot standby', async () => {
const mos = new MosConnection({
mosID: 'jestMOS',
acceptsConnections: true,
profiles: {
'0': true,
'1': true,
},
})
const onError = jest.fn((e) => console.log(e))
const onWarning = jest.fn((e) => console.log(e))
mos.on('error', onError)
mos.on('warning', onWarning)

expect(mos.acceptsConnections).toBe(true)
await initMosConnection(mos)
expect(mos.isListening).toBe(true)

const mosDevice = await mos.connect({
primary: {
id: 'primary',
host: '192.168.0.1',
timeout: 200,
},
secondary: {
id: 'secondary',
host: '192.168.0.2',
timeout: 200,
openMediaHotStandby: true,
},
})

expect(mosDevice).toBeTruthy()
expect(mosDevice.idPrimary).toEqual('jestMOS_primary')

const connections = getConnectionsFromDevice(mosDevice)
expect(connections.primary).toBeTruthy()
expect(connections.secondary).toBeTruthy()
connections.primary?.setAutoReconnectInterval(300)
connections.secondary?.setAutoReconnectInterval(300)

const onConnectionChange = jest.fn()
mosDevice.onConnectionChange((connectionStatus: IMOSConnectionStatus) => {
onConnectionChange(connectionStatus)
})

expect(SocketMock.instances).toHaveLength(7)
expect(SocketMock.instances[1].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[1].connectedPort).toEqual(10540)
expect(SocketMock.instances[2].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[2].connectedPort).toEqual(10541)
expect(SocketMock.instances[3].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[3].connectedPort).toEqual(10542)

// TODO: Perhaps the hot-standby should not be connected at all at this point?
expect(SocketMock.instances[4].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[4].connectedPort).toEqual(10540)
expect(SocketMock.instances[5].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[5].connectedPort).toEqual(10541)
expect(SocketMock.instances[6].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[6].connectedPort).toEqual(10542)

// Simulate primary connected:
for (const i of SocketMock.instances) {
if (i.connectedHost === '192.168.0.1') i.mockEmitConnected()
}
// Wait for the primary to be initially connected:
await waitFor(() => mosDevice.getConnectionStatus().PrimaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected behaviour from a hot standby - we leave it up to the library consumer to decide if this is bad or not
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
onConnectionChange.mockClear()

// Simulate primary disconnect, secondary hot standby takes over:
for (const i of SocketMock.instances) {
i.mockConnectCount = 0
if (i.connectedHost === '192.168.0.1') i.mockEmitClose()
if (i.connectedHost === '192.168.0.2') i.mockEmitConnected()
}

// Wait for the secondary to be connected:
await waitFor(() => mosDevice.getConnectionStatus().SecondaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: false,
PrimaryStatus: expect.stringContaining('Primary'),
SecondaryConnected: true,
SecondaryStatus: 'Secondary: Connected',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: false,
PrimaryStatus: expect.stringContaining('Primary'),
SecondaryConnected: true,
SecondaryStatus: 'Secondary: Connected',
})
onConnectionChange.mockClear()

// Simulate that the primary comes back online:
for (const i of SocketMock.instances) {
if (i.connectedHost === '192.168.0.1') {
expect(i.mockConnectCount).toBeGreaterThanOrEqual(1) // should have tried to reconnect
i.mockEmitConnected()
}

if (i.connectedHost === '192.168.0.2') i.mockEmitClose()
}

// Wait for the primary to be connected:
await waitFor(() => mosDevice.getConnectionStatus().PrimaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})

await mos.dispose()
})
})
async function waitFor(fcn: () => boolean, timeout: number): Promise<void> {
const startTime = Date.now()

while (Date.now() - startTime < timeout) {
await delay(10)

if (fcn()) return
}
throw new Error('Timeout in waitFor')
}
122 changes: 122 additions & 0 deletions packages/connector/src/__tests__/OpenMediaHotStandby.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { MosConnection } from "../MosConnection";
import { getMosConnection, setupMocks } from "./lib";
import { NCSServerConnection } from "../connection/NCSServerConnection";

describe('Hot Standby Feature', () => {
let mosConnection: MosConnection;
let primary: NCSServerConnection | null;
let secondary: NCSServerConnection | null;

beforeAll(() => {
setupMocks();
});

beforeEach(async () => {
mosConnection = await getMosConnection({
'0': true,
'1': true,
}, false);

const device = await mosConnection.connect({
primary: {
id: 'primary',
host: '127.0.0.1',
},
secondary: {
id: 'secondary',
host: '127.0.0.2',
openMediaHotStandby: true
}
});

// Wait for connections to be established
await new Promise(resolve => setTimeout(resolve, 100));

primary = device['_primaryConnection'];
secondary = device['_secondaryConnection'];
});

test('should disable secondary heartbeats when primary is connected', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
expect(primary.isHearbeatEnabled()).toBe(true);
expect(secondary.isHearbeatEnabled()).toBe(false);
}
});

test('should enable secondary heartbeats when primary disconnects', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
// Simulate primary disconnect
await primary.dispose();

// Wait for primary to disconnect
await new Promise(resolve => setTimeout(resolve, 100));

// Verify heartbeat states switched correctly
expect(secondary.isHearbeatEnabled()).toBe(true);
expect(primary.isHearbeatEnabled()).toBe(false);
}
});

test('should disable primary heartbeasts when secondary is connected and primary is disconnected', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
// Simulate primary disconnect
await primary.dispose();

// Wait for primary to disconnect
await new Promise(resolve => setTimeout(resolve, 100));

// Wait for secondary to connect
await new Promise(resolve => setTimeout(resolve, 100));

// Verify heartbeat states switched correctly
expect(secondary.isHearbeatEnabled()).toBe(true);
expect(primary.isHearbeatEnabled()).toBe(false);
}
})

test('should handle rapid primary connection changes', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
const connectionStates: boolean[] = [];

// Rapidly toggle primary connection
for (let i = 0; i < 5; i++) {
await primary.dispose();
await new Promise(resolve => setTimeout(resolve, 50));
primary.connect();
await new Promise(resolve => setTimeout(resolve, 50));

connectionStates.push(
secondary.connected,
primary.connected
);
}

// Verify states remained consistent
connectionStates.forEach((state, i) => {
if (i % 2 === 0) {
expect(state).toBe(false); // Secondary should be disabled
} else {
expect(state).toBe(true); // Primary should be enabled
}
});
}
});

afterEach(async () => {
if (mosConnection) {
await mosConnection.dispose();
}
});
});
16 changes: 16 additions & 0 deletions packages/connector/src/__tests__/lib.ts
Original file line number Diff line number Diff line change
@@ -9,6 +9,7 @@ import { Socket, Server } from 'net'
import { xml2js } from 'xml-js'

import * as iconv from 'iconv-lite'
import { NCSServerConnection } from '../connection/NCSServerConnection'
iconv.encodingExists('utf16-be')

// breaks net.Server, disabled for now
@@ -284,3 +285,18 @@ function fixSnapshotInner(data: any): [boolean, any] {
}
return [changed, data]
}

export function getConnectionsFromDevice(device: MosDevice): {
primary: NCSServerConnection | null
secondary: NCSServerConnection | null
current: NCSServerConnection | null
} {
return {
// @ts-expect-error private property
primary: device._primaryConnection,
// @ts-expect-error private property
secondary: device._secondaryConnection,
// @ts-expect-error private property
current: device._currentConnection,
}
}
Loading