Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(repeater): reconnect on low-level connection errors #520

Merged
merged 1 commit into from
Mar 9, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 116 additions & 23 deletions src/Repeater/DefaultRepeaterServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { logger, ProxyFactory } from '../Utils';
import {
DeployCommandOptions,
DeploymentRuntime,
RepeaterErrorCodes,
RepeaterServer,
RepeaterServerDeployedEvent,
RepeaterServerErrorEvent,
Expand All @@ -21,6 +22,7 @@ import { inject, injectable } from 'tsyringe';
import io, { Socket } from 'socket.io-client';
import parser from 'socket.io-msgpack-parser';
import { captureException, captureMessage } from '@sentry/node';
import { ErrorEvent } from 'ws';
import { EventEmitter, once } from 'events';
import Timer = NodeJS.Timer;

Expand Down Expand Up @@ -86,15 +88,18 @@ export class DefaultRepeaterServer implements RepeaterServer {
private readonly MAX_DEPLOYMENT_TIMEOUT = 60_000;
private readonly MAX_PING_INTERVAL = 10_000;
private readonly MAX_RECONNECTION_ATTEMPTS = 20;
private readonly MIN_RECONNECTION_DELAY = 1000;
private readonly MAX_RECONNECTION_DELAY = 86_400_000;
private readonly events = new EventEmitter();
private readonly handlerMap = new WeakMap<
RepeaterServerEventHandler<any>,
HandlerFunction
>();
private latestReconnectionError?: Error;
private timer?: Timer;
private pingTimer?: Timer;
private connectionTimer?: Timer;
private _socket?: Socket<SocketListeningEventMap, SocketEmitEventMap>;
private connectionAttempts = 0;

private get socket() {
if (!this._socket) {
Expand All @@ -115,6 +120,7 @@ export class DefaultRepeaterServer implements RepeaterServer {
public disconnect() {
this.events.removeAllListeners();
this.clearPingTimer();
this.clearConnectionTimer();

this._socket?.disconnect();
this._socket?.removeAllListeners();
Expand Down Expand Up @@ -151,6 +157,7 @@ export class DefaultRepeaterServer implements RepeaterServer {
path: '/api/ws/v1',
transports: ['websocket'],
reconnectionDelayMax: this.MAX_RECONNECTION_DELAY,
reconnectionDelay: this.MIN_RECONNECTION_DELAY,
timeout: this.options?.connectTimeout,
rejectUnauthorized: !this.options.insecure,
// @ts-expect-error Type is wrong.
Expand All @@ -172,12 +179,7 @@ export class DefaultRepeaterServer implements RepeaterServer {
this.listenToReservedEvents();
this.listenToApplicationEvents();

await Promise.race([
once(this.socket, 'connect'),
once(this.socket, 'connect_error').then(([error]: Error[]) => {
throw error;
})
]);
await once(this.socket, 'connect');

logger.debug('Repeater connected to %s', this.options.uri);
}
Expand Down Expand Up @@ -226,19 +228,9 @@ export class DefaultRepeaterServer implements RepeaterServer {
}

private listenToReservedEvents() {
this.socket.on('connect', () =>
this.events.emit(RepeaterServerEvents.CONNECTED)
);
this.socket.on('disconnect', (reason) => {
if (reason !== 'io client disconnect') {
this.events.emit(RepeaterServerEvents.DISCONNECTED);
}

// the disconnection was initiated by the server, you need to reconnect manually
if (reason === 'io server disconnect') {
this.socket.connect();
}
});
this.socket.on('connect', this.handleConnect);
this.socket.on('connect_error', this.handleConnectionError);
this.socket.on('disconnect', this.handleDisconnect);
this.socket.io.on('reconnect', () => {
this.latestReconnectionError = undefined;
});
Expand All @@ -262,6 +254,82 @@ export class DefaultRepeaterServer implements RepeaterServer {
);
}

private handleConnectionError = (err: Error) => {
const { data } = err as unknown as {
data?: Omit<RepeaterServerErrorEvent, 'transaction'>;
};

// If the error is not related to the repeater, we should ignore it
if (!data?.code) {
this.logConnectionError(err);

return;
}

if (this.suppressConnectionError(data)) {
this.events.emit(RepeaterServerEvents.ERROR, {
...data,
message: err.message
});

return;
}

if (this.connectionAttempts >= this.MAX_RECONNECTION_ATTEMPTS) {
this.events.emit(RepeaterServerEvents.RECONNECTION_FAILED, {
error: err
} as RepeaterServerReconnectionFailedEvent);

return;
}

// If the error is not related to the authentication, we should manually reconnect
this.scheduleReconnection();
};

private suppressConnectionError(
data: Omit<RepeaterServerErrorEvent, 'transaction'>
) {
return [
RepeaterErrorCodes.REPEATER_UNAUTHORIZED,
RepeaterErrorCodes.REPEATER_NOT_PERMITTED
].includes(data.code);
}

private scheduleReconnection() {
let delay = Math.max(
this.MIN_RECONNECTION_DELAY * 2 ** this.connectionAttempts,
this.MIN_RECONNECTION_DELAY
);
delay += delay * 0.3 * Math.random();
delay = Math.min(delay, this.MAX_RECONNECTION_DELAY);

this.connectionAttempts++;

this.events.emit(RepeaterServerEvents.RECONNECT_ATTEMPT, {
attempt: this.connectionAttempts,
maxAttempts: this.MAX_RECONNECTION_ATTEMPTS
} as RepeaterServerReconnectionAttemptedEvent);
this.connectionTimer = setTimeout(() => this.socket.connect(), delay);
}

private logConnectionError(err: Error) {
logger.debug(
'An error occurred while connecting to the repeater: %s',
err.message
);

const { description, cause } = err as {
description?: ErrorEvent;
cause?: Error;
};
const nestedError = description?.error ?? cause;

if (nestedError) {
logger.debug('The error cause: %s', nestedError.message);
}
}

private async wrapEventListener<TArgs extends TArg[], TArg>(
event: string,
handler: (...payload: TArgs) => unknown,
Expand Down Expand Up @@ -291,6 +359,31 @@ export class DefaultRepeaterServer implements RepeaterServer {
}
}

private clearConnectionTimer() {
if (this.connectionTimer) {
clearTimeout(this.connectionTimer);
}
}

private handleConnect = () => {
this.connectionAttempts = 0;
this.clearConnectionTimer();
this.events.emit(RepeaterServerEvents.CONNECTED);
};

private handleDisconnect = (reason: string): void => {
this.clearPingTimer();

if (reason !== 'io client disconnect') {
this.events.emit(RepeaterServerEvents.DISCONNECTED);
}

// the disconnection was initiated by the server, you need to reconnect manually
if (reason === 'io server disconnect') {
this.socket.connect();
}
};

private handleEventError(error: Error, event: string, args: unknown[]): void {
captureException(error);
logger.debug(
Expand All @@ -304,15 +397,15 @@ export class DefaultRepeaterServer implements RepeaterServer {
private createPingTimer() {
this.clearPingTimer();

this.timer = setInterval(
this.pingTimer = setInterval(
() => this.socket.volatile.emit(SocketEvents.PING),
this.MAX_PING_INTERVAL
).unref();
}

private clearPingTimer() {
if (this.timer) {
clearInterval(this.timer);
if (this.pingTimer) {
clearInterval(this.pingTimer);
}
}
}
Loading