From 0f63bbdc951e3aec7dbd9930087a34f05dfcfa2f Mon Sep 17 00:00:00 2001 From: Artem Derevnjuk Date: Fri, 8 Mar 2024 11:14:08 +0400 Subject: [PATCH] fix(repeater): enhance repeater lifecycle management (#519) * Improve connection handling by refining event listeners and error handling * Refine repeater event types and interfaces for better type safety and clarity relates-to: #515 --- src/Config/CliBuilder.ts | 2 +- src/Repeater/DefaultRepeaterServer.ts | 300 +++++++++++++++---------- src/Repeater/RepeaterServer.ts | 99 ++++---- src/Repeater/ServerRepeaterLauncher.ts | 130 +++++++---- 4 files changed, 330 insertions(+), 201 deletions(-) diff --git a/src/Config/CliBuilder.ts b/src/Config/CliBuilder.ts index 3e53c7aa..cd0f6a31 100644 --- a/src/Config/CliBuilder.ts +++ b/src/Config/CliBuilder.ts @@ -129,8 +129,8 @@ export class CliBuilder { private initSentry(dsn: string) { init({ - attachStacktrace: true, dsn, + attachStacktrace: true, release: process.env.VERSION, beforeSend(event) { if (event.contexts.args) { diff --git a/src/Repeater/DefaultRepeaterServer.ts b/src/Repeater/DefaultRepeaterServer.ts index 2108f7e4..52bfebc7 100644 --- a/src/Repeater/DefaultRepeaterServer.ts +++ b/src/Repeater/DefaultRepeaterServer.ts @@ -5,6 +5,9 @@ import { RepeaterServer, RepeaterServerDeployedEvent, RepeaterServerErrorEvent, + RepeaterServerEventHandler, + RepeaterServerEvents, + RepeaterServerEventsMap, RepeaterServerNetworkTestEvent, RepeaterServerNetworkTestResult, RepeaterServerReconnectionAttemptedEvent, @@ -18,7 +21,7 @@ import { inject, injectable } from 'tsyringe'; import io, { Socket } from 'socket.io-client'; import parser from 'socket.io-msgpack-parser'; import { captureException, captureMessage } from '@sentry/node'; -import { once } from 'events'; +import { EventEmitter, once } from 'events'; import Timer = NodeJS.Timer; export interface DefaultRepeaterServerOptions { @@ -33,13 +36,75 @@ export const DefaultRepeaterServerOptions: unique symbol = Symbol( 'DefaultRepeaterServerOptions' ); +type CallbackFunction = (arg: T) => unknown; +type HandlerFunction = (args: unknown[]) => unknown; + +const enum SocketEvents { + DEPLOYED = 'deployed', + DEPLOY = 'deploy', + UNDEPLOY = 'undeploy', + UNDEPLOYED = 'undeployed', + TEST_NETWORK = 'test-network', + ERROR = 'error', + UPDATE_AVAILABLE = 'update-available', + SCRIPT_UPDATED = 'scripts-updated', + PING = 'ping', + REQUEST = 'request' +} + +interface SocketListeningEventMap { + [SocketEvents.DEPLOYED]: (event: RepeaterServerDeployedEvent) => void; + [SocketEvents.UNDEPLOYED]: () => void; + [SocketEvents.ERROR]: (event: RepeaterServerErrorEvent) => void; + [SocketEvents.TEST_NETWORK]: ( + event: RepeaterServerNetworkTestEvent, + callback: CallbackFunction + ) => void; + [SocketEvents.UPDATE_AVAILABLE]: ( + event: RepeaterUpgradeAvailableEvent + ) => void; + [SocketEvents.SCRIPT_UPDATED]: ( + event: RepeaterServerScriptsUpdatedEvent + ) => void; + [SocketEvents.REQUEST]: ( + request: RepeaterServerRequestEvent, + callback: CallbackFunction + ) => void; +} + +interface SocketEmitEventMap { + [SocketEvents.DEPLOY]: ( + options: DeployCommandOptions, + runtime?: DeploymentRuntime + ) => void; + [SocketEvents.UNDEPLOY]: () => void; + [SocketEvents.PING]: () => void; +} + @injectable() export class DefaultRepeaterServer implements RepeaterServer { - private latestReconnectionError?: Error; + private readonly MAX_DEPLOYMENT_TIMEOUT = 60_000; + private readonly MAX_PING_INTERVAL = 10_000; private readonly MAX_RECONNECTION_ATTEMPTS = 20; - private readonly MAX_RECONNECTION_DELAY = 86400000; - private _socket?: Socket; + private readonly MAX_RECONNECTION_DELAY = 86_400_000; + private readonly events = new EventEmitter(); + private readonly handlerMap = new WeakMap< + RepeaterServerEventHandler, + HandlerFunction + >(); + private latestReconnectionError?: Error; private timer?: Timer; + private _socket?: Socket; + + private get socket() { + if (!this._socket) { + throw new Error( + 'Please make sure that repeater established a connection with host.' + ); + } + + return this._socket; + } constructor( @inject(ProxyFactory) private readonly proxyFactory: ProxyFactory, @@ -48,6 +113,7 @@ export class DefaultRepeaterServer implements RepeaterServer { ) {} public disconnect() { + this.events.removeAllListeners(); this.clearPingTimer(); this._socket?.disconnect(); @@ -59,17 +125,27 @@ export class DefaultRepeaterServer implements RepeaterServer { options: DeployCommandOptions, runtime: DeploymentRuntime ): Promise { - process.nextTick(() => this.socket.emit('deploy', options, runtime)); - - const [result]: RepeaterServerDeployedEvent[] = await once( - this.socket, - 'deployed' + process.nextTick(() => + this.socket.emit(SocketEvents.DEPLOY, options, runtime) ); + const [result]: RepeaterServerDeployedEvent[] = await Promise.race([ + once(this.socket, SocketEvents.DEPLOYED), + new Promise((_, reject) => + setTimeout( + reject, + this.MAX_DEPLOYMENT_TIMEOUT, + new Error('No response.') + ).unref() + ) + ]); + + this.createPingTimer(); + return result; } - public connect(hostname: string) { + public async connect(hostname: string) { this._socket = io(this.options.uri, { parser, path: '/api/ws/v1', @@ -93,156 +169,144 @@ export class DefaultRepeaterServer implements RepeaterServer { } }); - this.socket.on('connect_error', (error: Error) => - logger.debug(`Unable to connect to the %s host`, this.options.uri, error) - ); - - this.createPingTimer(); + this.listenToReservedEvents(); + this.listenToApplicationEvents(); - logger.debug('Event bus connected to %s', this.options.uri); - } + await Promise.race([ + once(this.socket, 'connect'), + once(this.socket, 'connect_error').then(([error]: Error[]) => { + throw error; + }) + ]); - public connected(handler: () => void | Promise): void { - this.socket.on('connect', () => - this.processEventHandler('connect', undefined, handler) - ); + logger.debug('Repeater connected to %s', this.options.uri); } - public requestReceived( - handler: ( - event: RepeaterServerRequestEvent - ) => RepeaterServerRequestResponse | Promise + public off( + event: K, + handler?: RepeaterServerEventHandler ): void { - this.socket.on('request', (payload, callback) => - this.processEventHandler('request', payload, handler, callback) - ); + const wrappedHandler = this.handlerMap.get(handler); + if (wrappedHandler) { + this.events.off(event, wrappedHandler); + this.handlerMap.delete(handler); + } } - public networkTesting( - handler: ( - event: RepeaterServerNetworkTestEvent - ) => - | RepeaterServerNetworkTestResult - | Promise + public on( + event: K, + handler: RepeaterServerEventHandler ): void { - this.socket.on('test-network', (payload, callback) => - this.processEventHandler('test-network', payload, handler, callback) - ); + const wrappedHandler = (...args: unknown[]) => + this.wrapEventListener(event, handler, ...args); + this.handlerMap.set(handler, wrappedHandler); + this.events.on(event, wrappedHandler); } - public upgradeAvailable( - handler: (event: RepeaterUpgradeAvailableEvent) => Promise | void - ): void { - this.socket.on('update-available', (payload, callback) => - this.processEventHandler('update-available', payload, handler, callback) + private listenToApplicationEvents() { + this.socket.on(SocketEvents.DEPLOYED, (event) => + this.events.emit(RepeaterServerEvents.DEPLOY, event) + ); + this.socket.on(SocketEvents.REQUEST, (event, callback) => + this.events.emit(RepeaterServerEvents.REQUEST, event, callback) + ); + this.socket.on(SocketEvents.TEST_NETWORK, (event, callback) => + this.events.emit(RepeaterServerEvents.TEST_NETWORK, event, callback) + ); + this.socket.on(SocketEvents.ERROR, (event) => { + captureMessage(event.message); + this.events.emit(RepeaterServerEvents.ERROR, event); + }); + this.socket.on(SocketEvents.UPDATE_AVAILABLE, (event) => + this.events.emit(RepeaterServerEvents.UPDATE_AVAILABLE, event) + ); + this.socket.on(SocketEvents.SCRIPT_UPDATED, (event) => + this.events.emit(RepeaterServerEvents.SCRIPTS_UPDATED, event) ); } - public scriptsUpdated( - handler: (event: RepeaterServerScriptsUpdatedEvent) => Promise | void - ): void { - this.socket.on('scripts-updated', (payload, callback) => - this.processEventHandler('scripts-updated', payload, handler, callback) + private listenToReservedEvents() { + this.socket.on('connect', () => + this.events.emit(RepeaterServerEvents.CONNECTED) ); - } + this.socket.on('disconnect', (reason) => { + if (reason !== 'io client disconnect') { + this.events.emit(RepeaterServerEvents.DISCONNECTED); + } - public reconnectionFailed( - handler: ( - event: RepeaterServerReconnectionFailedEvent - ) => void | Promise - ): void { + // the disconnection was initiated by the server, you need to reconnect manually + if (reason === 'io server disconnect') { + this.socket.connect(); + } + }); this.socket.io.on('reconnect', () => { this.latestReconnectionError = undefined; }); - this.socket.io.on( 'reconnect_error', (error) => (this.latestReconnectionError = error) ); - this.socket.io.on('reconnect_failed', () => - this.processEventHandler( - 'reconnection_failed', - { - error: this.latestReconnectionError - }, - handler - ) + this.events.emit(RepeaterServerEvents.RECONNECTION_FAILED, { + error: this.latestReconnectionError + } as RepeaterServerReconnectionFailedEvent) ); - } - - public errorOccurred( - handler: (event: RepeaterServerErrorEvent) => void | Promise - ): void { - this.socket.on('error', (payload, callback) => { - captureMessage(payload.message); - - return this.processEventHandler('error', payload, handler, callback); - }); - } - - public reconnectionAttempted( - handler: ( - event: RepeaterServerReconnectionAttemptedEvent - ) => void | Promise - ): void { this.socket.io.on('reconnect_attempt', (attempt) => - this.processEventHandler( - 'reconnect_attempt', - { attempt, maxAttempts: this.MAX_RECONNECTION_ATTEMPTS }, - handler - ) + this.events.emit(RepeaterServerEvents.RECONNECT_ATTEMPT, { + attempt, + maxAttempts: this.MAX_RECONNECTION_ATTEMPTS + } as RepeaterServerReconnectionAttemptedEvent) ); - } - - public reconnectionSucceeded(handler: () => void | Promise): void { this.socket.io.on('reconnect', () => - this.processEventHandler('reconnect', undefined, handler) + this.events.emit(RepeaterServerEvents.RECONNECTION_SUCCEEDED) ); } - private get socket() { - if (!this._socket) { - throw new Error( - 'Please make sure that repeater established a connection with host.' - ); - } - - return this._socket; - } - - private async processEventHandler

( + private async wrapEventListener( event: string, - payload: P, - handler: (payload: P) => unknown, - callback?: unknown + handler: (...payload: TArgs) => unknown, + ...args: unknown[] ) { try { - const response = await handler(payload); + const callback = this.extractLastArgument(args); - if (typeof callback !== 'function') { - return; - } + // eslint-disable-next-line @typescript-eslint/return-await + const response = await handler(...(args as TArgs)); - callback(response); - } catch (error) { - captureException(error); - logger.debug( - 'Error processing event "%s" with the following payload: %s. Details: %s', - event, - payload, - error - ); - logger.error(error); + callback?.(response); + } catch (err) { + this.handleEventError(err, event, args); } } + private extractLastArgument(args: unknown[]): CallbackFunction | undefined { + const lastArg = args.pop(); + if (typeof lastArg === 'function') { + return lastArg as CallbackFunction; + } else { + // If the last argument is not a function, add it back to the args array + args.push(lastArg); + + return undefined; + } + } + + private handleEventError(error: Error, event: string, args: unknown[]): void { + captureException(error); + logger.debug( + 'An error occurred while processing the %s event with the following payload: %j', + event, + args + ); + logger.error(error); + } + private createPingTimer() { this.clearPingTimer(); this.timer = setInterval( - () => this.socket.volatile.emit('ping'), - 10000 + () => this.socket.volatile.emit(SocketEvents.PING), + this.MAX_PING_INTERVAL ).unref(); } diff --git a/src/Repeater/RepeaterServer.ts b/src/Repeater/RepeaterServer.ts index db1901e6..a679ec06 100644 --- a/src/Repeater/RepeaterServer.ts +++ b/src/Repeater/RepeaterServer.ts @@ -59,8 +59,21 @@ export interface RepeaterServerReconnectionAttemptedEvent { maxAttempts: number; } +export enum RepeaterErrorCodes { + REPEATER_NOT_PERMITTED = 'repeater_not_permitted', + REPEATER_ALREADY_STARTED = 'repeater_already_started', + REPEATER_DEACTIVATED = 'repeater_deactivated', + REPEATER_UNAUTHORIZED = 'repeater_unauthorized', + REPEATER_NO_LONGER_SUPPORTED = 'repeater_no_longer_supported', + UNKNOWN_ERROR = 'unknown_error', + UNEXPECTED_ERROR = 'unexpected_error' +} + export interface RepeaterServerErrorEvent { message: string; + code: RepeaterErrorCodes; + transaction?: string; + remediation?: string; } export interface RepeaterUpgradeAvailableEvent { @@ -86,56 +99,64 @@ export interface DeploymentRuntime { nodeVersion?: string; } +export const enum RepeaterServerEvents { + DEPLOYED = 'deployed', + DEPLOY = 'deploy', + CONNECTED = 'connected', + DISCONNECTED = 'disconnected', + REQUEST = 'request', + TEST_NETWORK = 'test_network', + UPDATE_AVAILABLE = 'update_available', + SCRIPTS_UPDATED = 'scripts_updated', + RECONNECTION_FAILED = 'reconnection_failed', + RECONNECT_ATTEMPT = 'reconnect_attempt', + RECONNECTION_SUCCEEDED = 'reconnection_succeeded', + ERROR = 'error', + PING = 'ping' +} + +export interface RepeaterServerEventsMap { + [RepeaterServerEvents.DEPLOY]: [DeployCommandOptions, DeploymentRuntime?]; + [RepeaterServerEvents.DEPLOYED]: RepeaterServerDeployedEvent; + [RepeaterServerEvents.CONNECTED]: void; + [RepeaterServerEvents.DISCONNECTED]: void; + [RepeaterServerEvents.REQUEST]: RepeaterServerRequestEvent; + [RepeaterServerEvents.TEST_NETWORK]: RepeaterServerNetworkTestEvent; + [RepeaterServerEvents.UPDATE_AVAILABLE]: RepeaterUpgradeAvailableEvent; + [RepeaterServerEvents.SCRIPTS_UPDATED]: RepeaterServerScriptsUpdatedEvent; + [RepeaterServerEvents.RECONNECTION_FAILED]: RepeaterServerReconnectionFailedEvent; + [RepeaterServerEvents.RECONNECT_ATTEMPT]: RepeaterServerReconnectionAttemptedEvent; + [RepeaterServerEvents.RECONNECTION_SUCCEEDED]: void; + [RepeaterServerEvents.ERROR]: RepeaterServerErrorEvent; + [RepeaterServerEvents.PING]: void; +} + +export type RepeaterServerEventHandler< + K extends keyof RepeaterServerEventsMap +> = ( + ...args: RepeaterServerEventsMap[K] extends (infer U)[] + ? U[] + : [RepeaterServerEventsMap[K]] +) => unknown; + export interface RepeaterServer { disconnect(): void; - connect(hostname: string): void; + connect(hostname: string): Promise; deploy( options: DeployCommandOptions, runtime?: DeploymentRuntime ): Promise; - scriptsUpdated( - handler: (event: RepeaterServerScriptsUpdatedEvent) => Promise | void + on( + event: K, + handler: RepeaterServerEventHandler ): void; - upgradeAvailable( - handler: (event: RepeaterUpgradeAvailableEvent) => Promise | void - ): void; - - networkTesting( - handler: ( - event: RepeaterServerNetworkTestEvent - ) => - | RepeaterServerNetworkTestResult - | Promise - ): void; - - requestReceived( - handler: ( - event: RepeaterServerRequestEvent - ) => RepeaterServerRequestResponse | Promise - ): void; - - reconnectionFailed( - handler: ( - event: RepeaterServerReconnectionFailedEvent - ) => void | Promise - ): void; - - reconnectionAttempted( - handler: ( - event: RepeaterServerReconnectionAttemptedEvent - ) => void | Promise - ): void; - - reconnectionSucceeded(handler: () => void | Promise): void; - - connected(handler: () => void | Promise): void; - - errorOccurred( - handler: (event: RepeaterServerErrorEvent) => void | Promise + off( + event: K, + handler?: RepeaterServerEventHandler ): void; } diff --git a/src/Repeater/ServerRepeaterLauncher.ts b/src/Repeater/ServerRepeaterLauncher.ts index da616bbf..a7bae128 100644 --- a/src/Repeater/ServerRepeaterLauncher.ts +++ b/src/Repeater/ServerRepeaterLauncher.ts @@ -1,7 +1,10 @@ import { RepeaterLauncher } from './RepeaterLauncher'; import { DeploymentRuntime, + RepeaterErrorCodes, RepeaterServer, + RepeaterServerErrorEvent, + RepeaterServerEvents, RepeaterServerNetworkTestEvent, RepeaterServerReconnectionFailedEvent, RepeaterServerRequestEvent @@ -19,8 +22,7 @@ import { captureException } from '@sentry/node'; @injectable() export class ServerRepeaterLauncher implements RepeaterLauncher { - private static SERVICE_NAME = 'bright-repeater'; - private repeaterStarted: boolean = false; + private static readonly SERVICE_NAME = 'bright-repeater'; private repeaterRunning: boolean = false; private repeaterId: string | undefined; @@ -39,7 +41,7 @@ export class ServerRepeaterLauncher implements RepeaterLauncher { public close() { this.repeaterRunning = false; - this.repeaterStarted = false; + this.repeaterServer.disconnect(); return Promise.resolve(); @@ -99,8 +101,12 @@ export class ServerRepeaterLauncher implements RepeaterLauncher { logger.log('Starting the Repeater (%s)...', this.info.version); this.repeaterId = repeaterId; - this.repeaterServer.connect(repeaterId); + this.subscribeToEvents(); + + await this.repeaterServer.connect(this.repeaterId); + + logger.log('The Repeater (%s) started', this.info.version); } private getRuntime(): DeploymentRuntime { @@ -117,62 +123,100 @@ export class ServerRepeaterLauncher implements RepeaterLauncher { } private subscribeToEvents() { - this.repeaterServer.connected(async () => { - await this.repeaterServer.deploy( - { - repeaterId: this.repeaterId - }, - this.getRuntime() - ); - - if (!this.repeaterStarted) { - this.repeaterStarted = true; - - logger.log('The Repeater (%s) started', this.info.version); - } - }); - this.repeaterServer.errorOccurred(({ message }) => { - logger.error(`%s: %s`, chalk.red('(!) CRITICAL'), message); - }); - this.repeaterServer.reconnectionFailed((payload) => - this.reconnectionFailed(payload) - ); - this.repeaterServer.requestReceived((payload) => - this.requestReceived(payload) + this.repeaterServer.on(RepeaterServerEvents.CONNECTED, this.deployRepeater); + this.repeaterServer.on(RepeaterServerEvents.ERROR, this.handleError); + this.repeaterServer.on( + RepeaterServerEvents.RECONNECTION_FAILED, + this.reconnectionFailed ); - this.repeaterServer.networkTesting((payload) => - this.testingNetwork(payload) + this.repeaterServer.on(RepeaterServerEvents.REQUEST, this.requestReceived); + this.repeaterServer.on( + RepeaterServerEvents.TEST_NETWORK, + this.testingNetwork ); - this.repeaterServer.scriptsUpdated((payload) => + this.repeaterServer.on(RepeaterServerEvents.SCRIPTS_UPDATED, (payload) => this.commandHub.compileScripts(payload.script) ); - this.repeaterServer.upgradeAvailable((payload) => + this.repeaterServer.on(RepeaterServerEvents.UPDATE_AVAILABLE, (payload) => logger.warn( '%s: A new Repeater version (%s) is available, for update instruction visit https://docs.brightsec.com/docs/installation-options', chalk.yellow('(!) IMPORTANT'), payload.version ) ); - this.repeaterServer.reconnectionAttempted(({ attempt, maxAttempts }) => - logger.warn( - 'Failed to connect to Bright cloud (attempt %d/%d)', - attempt, - maxAttempts - ) + this.repeaterServer.on( + RepeaterServerEvents.RECONNECT_ATTEMPT, + ({ attempt, maxAttempts }) => + logger.warn( + 'Failed to connect to Bright cloud (attempt %d/%d)', + attempt, + maxAttempts + ) ); - this.repeaterServer.reconnectionSucceeded(() => + this.repeaterServer.on(RepeaterServerEvents.RECONNECTION_SUCCEEDED, () => logger.log('The Repeater (%s) connected', this.info.version) ); } - private reconnectionFailed({ error }: RepeaterServerReconnectionFailedEvent) { + private handleError = ({ + code, + message, + remediation + }: RepeaterServerErrorEvent) => { + const normalizedMessage = this.normalizeMessage(message); + const normalizedRemediation = this.normalizeMessage(remediation ?? ''); + + if (this.isCriticalError(code)) { + this.handleCriticalError(normalizedMessage, normalizedRemediation); + } else { + logger.error(normalizedMessage); + } + }; + + private normalizeMessage(message: string): string { + return message.replace(/\.$/, ''); + } + + private isCriticalError(code: RepeaterErrorCodes): boolean { + return [ + RepeaterErrorCodes.REPEATER_DEACTIVATED, + RepeaterErrorCodes.REPEATER_NO_LONGER_SUPPORTED, + RepeaterErrorCodes.REPEATER_UNAUTHORIZED, + RepeaterErrorCodes.REPEATER_ALREADY_STARTED, + RepeaterErrorCodes.REPEATER_NOT_PERMITTED, + RepeaterErrorCodes.UNEXPECTED_ERROR + ].includes(code); + } + + private handleCriticalError(message: string, remediation: string): void { + logger.error('%s: %s. %s', chalk.red('(!) CRITICAL'), message, remediation); + this.close().catch(logger.error); + process.exitCode = 1; + } + + private deployRepeater = async () => { + try { + await this.repeaterServer.deploy( + { + repeaterId: this.repeaterId + }, + this.getRuntime() + ); + } catch { + // noop + } + }; + + private reconnectionFailed = ({ + error + }: RepeaterServerReconnectionFailedEvent) => { captureException(error); logger.error(error); this.close().catch(logger.error); process.exitCode = 1; - } + }; - private async testingNetwork(event: RepeaterServerNetworkTestEvent) { + private testingNetwork = async (event: RepeaterServerNetworkTestEvent) => { try { const output = await this.commandHub.testNetwork(event.type, event.input); @@ -184,9 +228,9 @@ export class ServerRepeaterLauncher implements RepeaterLauncher { error: typeof e === 'string' ? e : (e as Error).message }; } - } + }; - private async requestReceived(event: RepeaterServerRequestEvent) { + private requestReceived = async (event: RepeaterServerRequestEvent) => { const response = await this.commandHub.sendRequest( new Request({ ...event }) ); @@ -210,5 +254,5 @@ export class ServerRepeaterLauncher implements RepeaterLauncher { message, encoding }; - } + }; }