Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/openmedia hotstandby heartbeat logic #105

Merged
merged 12 commits into from
Dec 5, 2024
Merged
19 changes: 18 additions & 1 deletion packages/connector/src/MosConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class MosConnection extends EventEmitter<MosConnectionEvents> implements
this._debug,
this.mosTypes.strict
)
let secondary = null
let secondary: NCSServerConnection | null = null
this._ncsConnections[connectionOptions.primary.host] = primary

primary.on('rawMessage', (type: string, message: string) => {
Expand Down Expand Up @@ -174,6 +174,23 @@ export class MosConnection extends EventEmitter<MosConnectionEvents> implements
false
)
}
// Handle that .openMediaHotStandby should not check for heartbeats on
// the secondary connection when the primary is connected
// And disable heartbeats on the primary when the primary is disconnected
if (connectionOptions.secondary?.openMediaHotStandby) {
// Initially disable heartbeats on secondary since primary should be attempted first
secondary.disableHeartbeats()

primary.on('connectionChanged', () => {
if (primary.connected) {
secondary?.disableHeartbeats()
primary.enableHeartbeats()
} else {
secondary?.enableHeartbeats()
primary.disableHeartbeats()
}
})
}
}

return this._registerMosDevice(
Expand Down
6 changes: 6 additions & 0 deletions packages/connector/src/__mocks__/socket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ export class SocketMock extends EventEmitter implements Socket {

private _responses: Array<ReplyTypes> = []
private _autoReplyToHeartBeat = true
public mockConnectCount = 0

constructor() {
super()
Expand Down Expand Up @@ -102,6 +103,7 @@ export class SocketMock extends EventEmitter implements Socket {
}
// @ts-expect-error mock
connect(port: number, host: string): this {
this.mockConnectCount++
this.connectedPort = port
this.connectedHost = host

Expand Down Expand Up @@ -197,6 +199,10 @@ export class SocketMock extends EventEmitter implements Socket {

this.emit('connect')
}
mockEmitClose(): void {
this.emit('close')
}

mockSentMessage0(data: unknown, encoding: string): void {
if (this._autoReplyToHeartBeat) {
const str: string = typeof data === 'string' ? data : this.decode(data as any)
Expand Down
163 changes: 162 additions & 1 deletion packages/connector/src/__tests__/MosConnection.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { clearMocks, decode, delay, encode, getMessageId, getXMLReply, initMosConnection, setupMocks } from './lib'
import {
clearMocks,
decode,
delay,
encode,
getConnectionsFromDevice,
getMessageId,
getXMLReply,
initMosConnection,
setupMocks,
} from './lib'
import { SocketMock } from '../__mocks__/socket'
import { ServerMock } from '../__mocks__/server'
import { xmlData, xmlApiData } from '../__mocks__/testData'
Expand Down Expand Up @@ -532,6 +542,157 @@ describe('MosDevice: General', () => {
expect(onError).toHaveBeenCalledTimes(0)
expect(onWarning).toHaveBeenCalledTimes(0)

await mos.dispose()
})
test('Hot standby', async () => {
const mos = new MosConnection({
mosID: 'jestMOS',
acceptsConnections: true,
profiles: {
'0': true,
'1': true,
},
})
const onError = jest.fn((e) => console.log(e))
const onWarning = jest.fn((e) => console.log(e))
mos.on('error', onError)
mos.on('warning', onWarning)

expect(mos.acceptsConnections).toBe(true)
await initMosConnection(mos)
expect(mos.isListening).toBe(true)

const mosDevice = await mos.connect({
primary: {
id: 'primary',
host: '192.168.0.1',
timeout: 200,
},
secondary: {
id: 'secondary',
host: '192.168.0.2',
timeout: 200,
openMediaHotStandby: true,
},
})

expect(mosDevice).toBeTruthy()
expect(mosDevice.idPrimary).toEqual('jestMOS_primary')

const connections = getConnectionsFromDevice(mosDevice)
expect(connections.primary).toBeTruthy()
expect(connections.secondary).toBeTruthy()
connections.primary?.setAutoReconnectInterval(300)
connections.secondary?.setAutoReconnectInterval(300)

const onConnectionChange = jest.fn()
mosDevice.onConnectionChange((connectionStatus: IMOSConnectionStatus) => {
onConnectionChange(connectionStatus)
})

expect(SocketMock.instances).toHaveLength(7)
expect(SocketMock.instances[1].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[1].connectedPort).toEqual(10540)
expect(SocketMock.instances[2].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[2].connectedPort).toEqual(10541)
expect(SocketMock.instances[3].connectedHost).toEqual('192.168.0.1')
expect(SocketMock.instances[3].connectedPort).toEqual(10542)

// TODO: Perhaps the hot-standby should not be connected at all at this point?
expect(SocketMock.instances[4].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[4].connectedPort).toEqual(10540)
expect(SocketMock.instances[5].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[5].connectedPort).toEqual(10541)
expect(SocketMock.instances[6].connectedHost).toEqual('192.168.0.2')
expect(SocketMock.instances[6].connectedPort).toEqual(10542)

// Simulate primary connected:
for (const i of SocketMock.instances) {
if (i.connectedHost === '192.168.0.1') i.mockEmitConnected()
}
// Wait for the primary to be initially connected:
await waitFor(() => mosDevice.getConnectionStatus().PrimaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected behaviour from a hot standby - we leave it up to the library consumer to decide if this is bad or not
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
onConnectionChange.mockClear()

// Simulate primary disconnect, secondary hot standby takes over:
for (const i of SocketMock.instances) {
i.mockConnectCount = 0
if (i.connectedHost === '192.168.0.1') i.mockEmitClose()
if (i.connectedHost === '192.168.0.2') i.mockEmitConnected()
}

// Wait for the secondary to be connected:
await waitFor(() => mosDevice.getConnectionStatus().SecondaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: false,
PrimaryStatus: expect.stringContaining('Primary'),
SecondaryConnected: true,
SecondaryStatus: 'Secondary: Connected',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: false,
PrimaryStatus: expect.stringContaining('Primary'),
SecondaryConnected: true,
SecondaryStatus: 'Secondary: Connected',
})
onConnectionChange.mockClear()

// Simulate that the primary comes back online:
for (const i of SocketMock.instances) {
if (i.connectedHost === '192.168.0.1') {
expect(i.mockConnectCount).toBeGreaterThanOrEqual(1) // should have tried to reconnect
i.mockEmitConnected()
}

if (i.connectedHost === '192.168.0.2') i.mockEmitClose()
}

// Wait for the primary to be connected:
await waitFor(() => mosDevice.getConnectionStatus().PrimaryConnected, 1000)

// Check that the connection status is as we expect:
expect(mosDevice.getConnectionStatus()).toMatchObject({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})
expect(onConnectionChange).toHaveBeenCalled()
expect(onConnectionChange).toHaveBeenLastCalledWith({
PrimaryConnected: true,
PrimaryStatus: 'Primary: Connected',
SecondaryConnected: false, // This is expected from a hot standby
SecondaryStatus: 'Secondary: No heartbeats on port query',
})

await mos.dispose()
})
})
async function waitFor(fcn: () => boolean, timeout: number): Promise<void> {
const startTime = Date.now()

while (Date.now() - startTime < timeout) {
await delay(10)

if (fcn()) return
}
throw new Error('Timeout in waitFor')
}
122 changes: 122 additions & 0 deletions packages/connector/src/__tests__/OpenMediaHotStandby.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import { MosConnection } from "../MosConnection";
import { getMosConnection, setupMocks } from "./lib";
import { NCSServerConnection } from "../connection/NCSServerConnection";

describe('Hot Standby Feature', () => {
let mosConnection: MosConnection;
let primary: NCSServerConnection | null;
let secondary: NCSServerConnection | null;

beforeAll(() => {
setupMocks();
});

beforeEach(async () => {
mosConnection = await getMosConnection({
'0': true,
'1': true,
}, false);

const device = await mosConnection.connect({
primary: {
id: 'primary',
host: '127.0.0.1',
},
secondary: {
id: 'secondary',
host: '127.0.0.2',
openMediaHotStandby: true
}
});

// Wait for connections to be established
await new Promise(resolve => setTimeout(resolve, 100));

primary = device['_primaryConnection'];
secondary = device['_secondaryConnection'];
});

test('should disable secondary heartbeats when primary is connected', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
expect(primary.isHearbeatEnabled()).toBe(true);
expect(secondary.isHearbeatEnabled()).toBe(false);
}
});

test('should enable secondary heartbeats when primary disconnects', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
// Simulate primary disconnect
await primary.dispose();

// Wait for primary to disconnect
await new Promise(resolve => setTimeout(resolve, 100));

// Verify heartbeat states switched correctly
expect(secondary.isHearbeatEnabled()).toBe(true);
expect(primary.isHearbeatEnabled()).toBe(false);
}
});

test('should disable primary heartbeasts when secondary is connected and primary is disconnected', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
// Simulate primary disconnect
await primary.dispose();

// Wait for primary to disconnect
await new Promise(resolve => setTimeout(resolve, 100));

// Wait for secondary to connect
await new Promise(resolve => setTimeout(resolve, 100));

// Verify heartbeat states switched correctly
expect(secondary.isHearbeatEnabled()).toBe(true);
expect(primary.isHearbeatEnabled()).toBe(false);
}
})

test('should handle rapid primary connection changes', async () => {
expect(primary).toBeTruthy();
expect(secondary).toBeTruthy();

if (primary && secondary) {
const connectionStates: boolean[] = [];

// Rapidly toggle primary connection
for (let i = 0; i < 5; i++) {
await primary.dispose();
await new Promise(resolve => setTimeout(resolve, 50));
primary.connect();
await new Promise(resolve => setTimeout(resolve, 50));

connectionStates.push(
secondary.connected,
primary.connected
);
}

// Verify states remained consistent
connectionStates.forEach((state, i) => {
if (i % 2 === 0) {
expect(state).toBe(false); // Secondary should be disabled
} else {
expect(state).toBe(true); // Primary should be enabled
}
});
}
});

afterEach(async () => {
if (mosConnection) {
await mosConnection.dispose();
}
});
});
16 changes: 16 additions & 0 deletions packages/connector/src/__tests__/lib.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { Socket, Server } from 'net'
import { xml2js } from 'xml-js'

import * as iconv from 'iconv-lite'
import { NCSServerConnection } from '../connection/NCSServerConnection'
iconv.encodingExists('utf16-be')

// breaks net.Server, disabled for now
Expand Down Expand Up @@ -284,3 +285,18 @@ function fixSnapshotInner(data: any): [boolean, any] {
}
return [changed, data]
}

export function getConnectionsFromDevice(device: MosDevice): {
primary: NCSServerConnection | null
secondary: NCSServerConnection | null
current: NCSServerConnection | null
} {
return {
// @ts-expect-error private property
primary: device._primaryConnection,
// @ts-expect-error private property
secondary: device._secondaryConnection,
// @ts-expect-error private property
current: device._currentConnection,
}
}
Loading